Skip to main content
Version: Beta 🚧

spark_stream_config

Summary​

Declare a tecton.SparkStreamConfig for configuring a Stream Source with a Data Source Function. The function takes in a SparkSession returns a streaming DataFrame.


Example

from tecton import spark_stream_config
def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType
payload_schema = StructType([
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("timestamp", StringType(), False),
])
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp")
)
)
@spark_stream_config()
def kinesis_data_source_function(spark):
options = {
"streamName": "<stream name>",
"roleArn": "<role ARN>",
"region": "<region>",
"shardFetchInterval": "30s",
"initialPosition": "latest"
}
reader = spark.readStream.format("kinesis").options(**options)
df = reader.load()
df = raw_data_deserialization(df)
watermark = "{} seconds".format(timedelta(hours=24).seconds)
df = df.withWatermark("timestamp", watermark)
return df

Was this page helpful?