tecton.KinesisDSConfig¶
-
class
tecton.
KinesisDSConfig
(stream_name, region, raw_stream_translator, timestamp_key, default_initial_stream_position, default_watermark_delay_threshold, deduplication_columns=None, options=None)¶ Configuration used to reference a Kinesis stream.
The KinesisDSConfig class is used to create a reference to an AWS Kinesis stream.
This class used as an input to a
StreamDataSource
’s parameterstream_ds_config
. This class is not a Tecton Object: it is a grouping of parameters. Declaring this class alone will not register a data source. Instead, declare as part ofStreamDataSource
that takes this configuration class instance as a parameter.Methods
Instantiates a new KinesisDSConfig.
-
__init__
(stream_name, region, raw_stream_translator, timestamp_key, default_initial_stream_position, default_watermark_delay_threshold, deduplication_columns=None, options=None)¶ Instantiates a new KinesisDSConfig.
- Parameters
stream_name (
str
) – Name of the Kinesis stream.region (
str
) – AWS region of the stream, e.g: “us-west-2”.raw_stream_translator – Python user defined function f(DataFrame) -> DataFrame that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of raw_stream_translator in the User Guide.
timestamp_key (
str
) – Name of the column containing timestamp for watermarking.default_initial_stream_position (
str
) – Initial position in stream, e.g: “latest” or “trim_horizon”. More information available in Spark Kinesis Documentation.default_watermark_delay_threshold (
str
) – Watermark time interval, e.g: “24 hours”, used by Spark Structured Streaming to account for late-arriving data. See: https://docs.tecton.ai/v2/overviews/framework/feature_views/stream_feature_view.html#productionizing-a-streamdeduplication_columns (
Optional
[List
[str
]]) – (Optional) Columns in the stream data that uniquely identify data records. Used for de-duplicating.options (
Optional
[Dict
[str
,str
]]) – (Optional) A map of additional Spark readStream options
- Returns
A KinesisDSConfig class instance.
Example of a KinesisDSConfig declaration:
import pyspark from tecton import KinesisDSConfig # Define our deserialization raw stream translator def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame: from pyspark.sql.functions import col, from_json, from_utc_timestamp from pyspark.sql.types import StructType, StringType payload_schema = ( StructType() .add('amount', StringType(), False) .add('isFraud', StringType(), False) .add('timestamp', StringType(), False) ) return ( df.selectExpr('cast (data as STRING) jsonData') .select(from_json('jsonData', payload_schema).alias('payload')) .select( col('payload.amount').cast('long').alias('amount'), col('payload.isFraud').cast('long').alias('isFraud'), from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp') ) ) # Declare KinesisDSConfig instance object that can be used as argument in `StreamDataSource` stream_ds_config = KinesisDSConfig( stream_name='transaction_events', region='us-west-2', default_initial_stream_position='latest', default_watermark_delay_threshold='30 minutes', timestamp_key='timestamp', raw_stream_translator=raw_data_deserialization, options={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'} )
-