tecton.KinesisDSConfig

class tecton.KinesisDSConfig(stream_name, region, raw_stream_translator, timestamp_key, default_initial_stream_position, default_watermark_delay_threshold, deduplication_columns=None, options=None)

Configuration used to reference a Kinesis stream.

The KinesisDSConfig class is used to create a reference to an AWS Kinesis stream.

This class used as an input to a StreamDataSource’s parameter stream_ds_config. This class is not a Tecton Object: it is a grouping of parameters. Declaring this class alone will not register a data source. Instead, declare as part of StreamDataSource that takes this configuration class instance as a parameter.

Methods

__init__

Instantiates a new KinesisDSConfig.

__init__(stream_name, region, raw_stream_translator, timestamp_key, default_initial_stream_position, default_watermark_delay_threshold, deduplication_columns=None, options=None)

Instantiates a new KinesisDSConfig.

Parameters
  • stream_name (str) – Name of the Kinesis stream.

  • region (str) – AWS region of the stream, e.g: “us-west-2”.

  • raw_stream_translator – Python user defined function f(DataFrame) -> DataFrame that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of raw_stream_translator in the User Guide.

  • timestamp_key (str) – Name of the column containing timestamp for watermarking.

  • default_initial_stream_position (str) – Initial position in stream, e.g: “latest” or “trim_horizon”. More information available in Spark Kinesis Documentation.

  • default_watermark_delay_threshold (str) – Watermark time interval, e.g: “24 hours”, used by Spark Structured Streaming to account for late-arriving data. See: https://docs.tecton.ai/v2/overviews/framework/feature_views/stream_feature_view.html#productionizing-a-stream

  • deduplication_columns (Optional[List[str]]) – (Optional) Columns in the stream data that uniquely identify data records. Used for de-duplicating.

  • options (Optional[Dict[str, str]]) – (Optional) A map of additional Spark readStream options

Returns

A KinesisDSConfig class instance.

Example of a KinesisDSConfig declaration:

import pyspark
from tecton import KinesisDSConfig


# Define our deserialization raw stream translator
def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    from pyspark.sql.functions import col, from_json, from_utc_timestamp
    from pyspark.sql.types import StructType, StringType

    payload_schema = (
      StructType()
            .add('amount', StringType(), False)
            .add('isFraud', StringType(), False)
            .add('timestamp', StringType(), False)
    )

    return (
        df.selectExpr('cast (data as STRING) jsonData')
        .select(from_json('jsonData', payload_schema).alias('payload'))
        .select(
            col('payload.amount').cast('long').alias('amount'),
            col('payload.isFraud').cast('long').alias('isFraud'),
            from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
        )
    )
# Declare KinesisDSConfig instance object that can be used as argument in `StreamDataSource`
stream_ds_config = KinesisDSConfig(
                        stream_name='transaction_events',
                        region='us-west-2',
                        default_initial_stream_position='latest',
                        default_watermark_delay_threshold='30 minutes',
                        timestamp_key='timestamp',
                        raw_stream_translator=raw_data_deserialization,
                        options={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'}
)