KinesisConfig
Summary​
Configuration used to reference a Kinesis stream.
The KinesisConfig class is used to create a reference to an AWS Kinesis stream.
This class is used as an input to a StreamSource
’s
parameter stream_config
. Declaring this configuration class alone will not
register a Data Source. Instead, declare as a part of StreamSource
that takes
this configuration class instance as a parameter.
Example​
import pyspark
from tecton import KinesisConfig
# Define our deserialization raw stream translator
def raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
from pyspark.sql.functions import col, from_json, from_utc_timestamp
from pyspark.sql.types import StructType, StringType
payload_schema = (
StructType()
.add("amount", StringType(), False)
.add("isFraud", StringType(), False)
.add("timestamp", StringType(), False)
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.amount").cast("long").alias("amount"),
col("payload.isFraud").cast("long").alias("isFraud"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
# Declare KinesisConfig instance object that can be used as argument in `StreamSource`
stream_config = KinesisConfig(
stream_name="transaction_events",
region="us-west-2",
initial_stream_position="latest",
timestamp_field="timestamp",
post_processor=raw_data_deserialization,
options={"roleArn": "arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro"},
)
Methods​
__init__(...)​
Instantiates a new KinesisConfig.
Parameters​
-
stream_name
(str
) – Name of the Kinesis stream. -
region
(str
) – AWS region of the stream, e.g:us-west-2
. -
post_processor
– Python user defined functionf(DataFrame) -> DataFrame
that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of post_processor in the User Guide. -
timestamp_field
(str
) – Name of the column containing timestamp for watermarking. -
initial_stream_position
(str
) – Initial position in stream, e.g:latest
ortrim_horizon
. More information available in Spark Kinesis Documentation. -
watermark_delay_threshold
(timedelta
) – Watermark time interval, e.g: timedelta(hours=36), used by Spark Structured Streaming to account for late-arriving data. See: Productionizing a Stream. (Default:datetime.timedelta(days=1)
). -
deduplication_columns
(Optional
[List
[str
]]) – Columns in the stream data that uniquely identify data records. Used for de-duplicating. (Default:None
) -
options
(Optional
[Dict
[str
,str
]]) – A map of additional Spark readStream options. (Default:None
)
Returns​
A KinesisConfig class instance.