spark_stream_config
Summary​
Declare atecton.SparkStreamConfig
for configuring a Stream Source with a Data Source Function. The function takes in a SparkSession
returns a streaming DataFrame
.Â
Example defining a Data Source Function using
spark_stream_config
:Example
from tecton import spark_stream_configdef raw_data_deserialization(df):from pyspark.sql.functions import col, from_json, from_utc_timestamp, whenfrom pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerTypepayload_schema = StructType([StructField("user_id", StringType(), False),StructField("transaction_id", StringType(), False),StructField("category", StringType(), False),StructField("amt", StringType(), False),StructField("timestamp", StringType(), False),])return (df.selectExpr("cast (data as STRING) jsonData").select(from_json("jsonData", payload_schema).alias("payload")).select(col("payload.user_id").alias("user_id"),col("payload.transaction_id").alias("transaction_id"),col("payload.category").alias("category"),col("payload.amt").cast("double").alias("amt"),from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp")))@spark_stream_config()def kinesis_data_source_function(spark):options = {"streamName": "<stream name>","roleArn": "<role ARN>","region": "<region>","shardFetchInterval": "30s","initialPosition": "latest"}reader = spark.readStream.format("kinesis").options(**options)df = reader.load()df = raw_data_deserialization(df)watermark = "{} seconds".format(timedelta(hours=24).seconds)df = df.withWatermark("timestamp", watermark)return df