tecton.declarative.spark_stream_config

tecton.declarative.spark_stream_config()

Declare an tecton.declarative.data_source.SparkStreamConfig for configuring a stream source with a Data Source Function. The function takes in a SparkSession returns a streaming DataFrame.

Example defining a Data Source Function using spark_stream_config:

from tecton import spark_stream_config

def raw_data_deserialization(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType
    payload_schema = StructType([
        StructField("user_id", StringType(), False),
        StructField("transaction_id", StringType(), False),
        StructField("category", StringType(), False),
        StructField("amt", StringType(), False),
        StructField("timestamp", StringType(), False),
    ])
    return (
        df.selectExpr("cast (data as STRING) jsonData")
        .select(from_json("jsonData", payload_schema).alias("payload"))
        .select(
            col("payload.user_id").alias("user_id"),
            col("payload.transaction_id").alias("transaction_id"),
            col("payload.category").alias("category"),
            col("payload.amt").cast("double").alias("amt"),
            from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp")
        )
    )

@spark_stream_config()
def kinesis_data_source_function(spark):
    options = {
        "streamName": "<stream name>",
        "roleArn": "<role ARN>",
        "region": "<region>",
        "shardFetchInterval": "30s",
        "initialPosition": "latest"
    }
    reader = spark.readStream.format("kinesis").options(**options)
    df = reader.load()
    df = raw_data_deserialization(df)
    watermark = "{} seconds".format(timedelta(hours=24).seconds)
    df = df.withWatermark("timestamp", watermark)
    return df