Skip to main content
Version: 0.8

Stream Feature View Examples

Row-Level SQL Transformation​

from tecton import stream_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
description="Last user transaction amount (stream calculated)",
)
def last_transaction_amount_sql(transactions):
return f"""
SELECT
timestamp,
user_id,
amt
FROM
{transactions}
"""

Row-Level PySpark Transformation​

from tecton import stream_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pyspark",
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
description="Last user transaction amount (stream calculated)",
)
def last_transaction_amount_pyspark(transactions):
from pyspark.sql import functions as f

return transactions.select("timestamp", "user_id", "amt")

Time-Windowed Aggregations​

from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""

Time-Windowed Aggregations with StreamProcessingMode.CONTINUOUS​

Please see the Stream Processing Mode documentation for details on how StreamProcessingMode works.

Feature Definition Example​

  • Set stream_processing_mode=StreamProcessingMode.CONTINUOUS to enable continuous event processing.
  • Optionally set instance_availability="on_demand" within the stream_cluster_config. Spot Instances may lead to feature processing delays due to spot termination or looking for an available instance, so On-demand Instances will deliver more consistent performance.

This example Feature View shows how to configure the decorator parameters.

from tecton import stream_feature_view, FilteredSource, Aggregation, StreamProcessingMode
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta

# The following defines a continuous streaming feature
# It counts the number of non-fraudulent transactions per user over a 1min, 5min and 1h time window
# The expected freshness for these features is ~5 seconds
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
stream_processing_mode=StreamProcessingMode.CONTINUOUS,
aggregations=[
Aggregation(column="transaction", function="count", time_window=timedelta(minutes=1)),
Aggregation(column="transaction", function="count", time_window=timedelta(minutes=30)),
Aggregation(column="transaction", function="count", time_window=timedelta(hours=1)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Number of transactions a user has made recently",
)
def user_continuous_transaction_count(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""

Stream Data Source Configuration​

If your stream data source is Kinesis, we suggest lowering the default buffering to avoid delays in event processing.

Here are some suggested parameter values for a KinesisConfig:

  • For Databricks users:

    • maxFetchDuration="200ms"
    • maxFetchRate="2"
    • minFetchPeriod="200ms"
  • For EMR users:

    • kinesis.executor.idleTimeBetweenReadsInMs ="200"
    • kinesis.executor.maxFetchTimeInMs = "200"

This example data source shows how to configure the stream options on a KinesisConfig with Databricks.

from tecton import (
HiveConfig,
KinesisConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta


def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)

payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merch_lat", StringType(), False),
StructField("merch_long", StringType(), False),
StructField("timestamp", StringType(), False),
]
)

return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merch_lat").cast("double").alias("merch_lat"),
col("payload.merch_long").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)


partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]

batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)


transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="tecton-demo-fraud-data-stream",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(hours=24),
timestamp_field="timestamp",
post_processor=raw_data_deserialization,
options={"roleArn": "arn:aws:iam::706752053316:role/tecton-demo-fraud-data-cross-account-kinesis-ro"},
),
batch_config=batch_config,
)

transactions_batch = BatchSource(
name="transactions_batch",
batch_config=batch_config,
)

Was this page helpful?