Skip to main content
Version: 1.0

Stream Feature View with Spark

A Stream Feature View with Spark defines transformations with mode='pyspark' or mode='sparksql'against a Stream Data Source and can compute features in near-real-time. It processes raw data from a streaming source (e.g. Kafka or Kinesis) and can be backfilled from a batch source that contains a historical log of stream events.

Tecton executes materialization jobs of Stream Feature View(s) with Spark on your connected data platform.

For information about connecting data sources to Spark, see
Connect Data Sources to Spark.

Stream Feature Transformations​

Stream Feature Views can define row-level transformations for filtering and projection, and aggregations using Tecton's Aggregation Engine.

Row-Level Transformations​

Stream Feature Views can run row-level Spark SQL or PySpark transformations:

from tecton import stream_feature_view, Attribute
from tecton.types import Float64


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
timestamp_field="TIMESTAMP",
features=[Attribute("AMOUNT", Float64)],
)
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

Sliding Window aggregations​

Stream Feature Views can also run aggregation transformations on Tecton's Aggregation Engine:

from tecton import stream_feature_view, Aggregate
from tecton.types import Float64
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
aggregation_interval=timedelta(minutes=10),
timestamp_field="TIMESTAMP",
features=[
Aggregate(
input_column=Field("AMOUNT", Float64),
function="mean",
time_window=timedelta(hours=1),
name="average_amount_1h",
),
Aggregate(
input_column=Field("AMOUNT", Float64),
function="mean",
time_window=timedelta(hours=12),
name="average_amount_12h",
),
Aggregate(
input_column=Field("AMOUNT", Float64),
function="mean",
time_window=timedelta(hours=24),
name="average_amount_24h",
),
],
)
def user_transaction_amount_averages(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

The aggregation_interval parameter defines the interval by which the window advances. Tecton will update the feature value in the online store after the aggregation interval has elapsed, assuming there was at least one event for that key.

Continuous Window aggregations​

If you need streaming features that update within seconds (and not every aggregation_interval), you can use continuous time windows by setting stream_processing_mode to StreamProcessingMode.CONTINUOUS:

from tecton import stream_feature_view, Aggregate, StreamProcessingMode
from tecton.types import Int32
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta

# The following defines a continuous streaming feature
# It counts the number of non-fraudulent transactions per user over a 1min, 5min and 1h time window
# The expected freshness for these features is ~5 seconds
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
stream_processing_mode=StreamProcessingMode.CONTINUOUS,
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("transaction", Int32), function="count", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("transaction", Int32), function="count", time_window=timedelta(minutes=30)),
Aggregate(input_column=Field("transaction", Int32), function="count", time_window=timedelta(hours=1)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Number of transactions a user has made recently",
)
def user_continuous_transaction_count(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""

How to choose between continuous and sliding windows​

The stream_processing_mode parameter defines how quickly Stream Feature Views will process new events. The processing mode options are StreamProcessingMode.TIME_INTERVAL and StreamProcessingMode.CONTINUOUS. The default is StreamProcessingMode.TIME_INTERVAL.

When to Use StreamProcessingMode.CONTINUOUS​

You should use StreamProcessingMode.CONTINUOUS if your model performance depends on updating features within seconds. For example, it may be important for a fraud detection use-case for features to include previous transactions made even a few seconds prior.

When using continuous processing, Tecton will store all transformed events in the online store and run the full aggregations at the time of request.

note

If second-level feature freshness is not good enough for your use case, and you need millisecond-level freshness, please consider using Tecton's Stream Ingest API.

You should use the default StreamProcessingMode.TIME_INTERVAL to optimize costs if your model can tolerate features updating less quickly; with StreamProcessingMode.TIME_INTERVAL:

  • Feature Views that use time-windowed aggregations can have a minimum interval of 1 minute
  • Feature Views that do not use time-windowed aggregations will typically take 1 minute to update feature values

StreamProcessingMode.CONTINUOUS may lead to higher infrastructure costs due to more frequent feature writes and checkpointing updates. Using a longer aggregation interval can have lower costs, especially if a single key may have multiple events in a short period of time that can be grouped into a single interval.

Checkpointing costs with StreamProcessingMode.CONTINUOUS on EMR

StreamProcessingMode.CONTINUOUS can cause significant S3 costs for customers using Tecton with EMR due to the frequency of writing Spark Streaming checkpoints to S3. The Databricks implementation of Spark Streaming has much lower checkpointing costs.

Productionizing a Stream​

For a Stream Feature View used in production where late data loss is unacceptable, it is recommended to set the Stream Data Source watermark_delay_threshold to your stream retention period, or at least 24 hours. This will configure Spark Structured Streaming to not drop data in the event that it processes the events late or out-of-order. The tradeoff of a longer watermark delay is greater amount of in-memory state used by the streaming job.

The ttl (time-to-live) parameter​

The value of ttl (a Stream Feature View parameter) affects the availability of feature data in the online store, the generation of training feature data, and the deletion of feature values from the online store.

For more details, see The ttl Parameter in Feature Views.

Was this page helpful?