Skip to main content
Version: Beta 🚧

KinesisConfig

Summary​

The KinesisConfig class is used to create a reference to an AWS Kinesis stream.
 
This class is used as an input to a StreamSource's parameter stream_config. Declaring this configuration class alone will not register a Data Source. Instead, declare as a part of StreamSource that takes this configuration class instance as a parameter.

Example

import pyspark
from tecton import KinesisConfig


# Define our deserialization raw stream translator
def raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    from pyspark.sql.functions import col, from_json, from_utc_timestamp
    from pyspark.sql.types import StructType, StringType

    payload_schema = (
        StructType()
        .add("amount", StringType(), False)
        .add("isFraud", StringType(), False)
        .add("timestamp", StringType(), False)
    )

    return (
        df.selectExpr("cast (data as STRING) jsonData")
        .select(from_json("jsonData", payload_schema).alias("payload"))
        .select(
            col("payload.amount").cast("long").alias("amount"),
            col("payload.isFraud").cast("long").alias("isFraud"),
            from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
        )
    )


# Declare KinesisConfig instance object that can be used as argument in `StreamSource`
stream_config = KinesisConfig(
    stream_name="transaction_events",
    region="us-west-2",
    initial_stream_position="latest",
    timestamp_field="timestamp",
    post_processor=raw_data_deserialization,
    options={"roleArn": "arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro"},
)

Methods​

NameDescription
__init__(...)Instantiates a new KinesisConfig.

__init__(...)​

Instantiates a new KinesisConfig.

Parameters

  • stream_name (str) - Name of the Kinesis stream.

  • region (str) - AWS region of the stream, e.g: "us-west-2".

  • post_processor () - Python user defined function f(DataFrame) -> DataFrame that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of post_processor in the User Guide.

  • timestamp_field (str) - Name of the column containing timestamp for watermarking.

  • initial_stream_position (str) - Initial position in stream, e.g: "latest" or "trim_horizon". More information available in Spark Kinesis Documentation.

  • watermark_delay_threshold (timedelta) - (Default: 24h) Watermark time interval, e.g: timedelta(hours=36), used by Spark Structured Streaming to account for late-arriving data. See: Productionizing a Stream. Default: 1 day, 0:00:00

  • deduplication_columns (Optional[List[str]]) - Columns in the stream data that uniquely identify data records. Used for de-duplicating. Default: None

  • options (Union[Dict[str, str], NoneType]) - A map of additional Spark readStream options Default: None

Returns

A KinesisConfig class instance.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon