KinesisConfig
Summary​
The KinesisConfig class is used to create a reference to an AWS Kinesis stream.Â
This class is used as an input to a
StreamSource
's parameter stream_config
. Declaring this configuration
class alone will not register a Data Source. Instead, declare as a part of StreamSource
that takes this configuration
class instance as a parameter.Example
import pysparkfrom tecton import KinesisConfig# Define our deserialization raw stream translatordef raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:from pyspark.sql.functions import col, from_json, from_utc_timestampfrom pyspark.sql.types import StructType, StringTypepayload_schema = (StructType().add("amount", StringType(), False).add("isFraud", StringType(), False).add("timestamp", StringType(), False))return (df.selectExpr("cast (data as STRING) jsonData").select(from_json("jsonData", payload_schema).alias("payload")).select(col("payload.amount").cast("long").alias("amount"),col("payload.isFraud").cast("long").alias("isFraud"),from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),))# Declare KinesisConfig instance object that can be used as argument in `StreamSource`stream_config = KinesisConfig(stream_name="transaction_events",region="us-west-2",initial_stream_position="latest",timestamp_field="timestamp",post_processor=raw_data_deserialization,options={"roleArn": "arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro"},)
Methods​
Name | Description |
---|---|
__init__(...) | Instantiates a new KinesisConfig. |
__init__(...)​
Instantiates a new KinesisConfig.Parameters
stream_name
(str
) - Name of the Kinesis stream.region
(str
) - AWS region of the stream, e.g: "us-west-2".post_processor
() - Python user defined functionf(DataFrame) -> DataFrame
that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of post_processor in the User Guide.timestamp_field
(str
) - Name of the column containing timestamp for watermarking.initial_stream_position
(str
) - Initial position in stream, e.g: "latest" or "trim_horizon". More information available in Spark Kinesis Documentation.watermark_delay_threshold
(timedelta
) - Watermark time interval, e.g: timedelta(hours=36), used by Spark Structured Streaming to account for late-arriving data. See: Productionizing a Stream. Default:24h
deduplication_columns
(Optional
[List
[str
]]) - Columns in the stream data that uniquely identify data records. Used for de-duplicating. Default:None
options
(Optional
[Dict
[str
,str
]]) - A map of additional Spark readStream options Default:None