Stream Feature View with Spark
A Stream Feature View with Spark defines transformations with mode='pyspark'
or mode='sparksql'
against a Stream Data Source and can compute features in
near-real-time. It processes raw data from a streaming source (e.g. Kafka or
Kinesis) and can be backfilled from a batch source that contains a historical
log of stream events.
Tecton executes materialization jobs of Stream Feature View(s) with Spark on your connected data platform.
Stream Feature Transformations​
Stream Feature Views can define row-level transformations for filtering and projection, and aggregations using Tecton's Aggregation Engine.
Row-Level Transformations​
Stream Feature Views can run row-level Spark SQL or PySpark transformations:
from tecton import stream_feature_view, Attribute
from tecton.types import Float64
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
timestamp_field="TIMESTAMP",
features=[Attribute("AMOUNT", Float64)],
)
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
Sliding Window aggregations​
Stream Feature Views can also run aggregation transformations on Tecton's Aggregation Engine:
from tecton import stream_feature_view, Aggregate
from tecton.types import Float64
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
aggregation_interval=timedelta(minutes=10),
timestamp_field="TIMESTAMP",
features=[
Aggregate(
input_column=Field("AMOUNT", Float64),
function="mean",
time_window=timedelta(hours=1),
name="average_amount_1h",
),
Aggregate(
input_column=Field("AMOUNT", Float64),
function="mean",
time_window=timedelta(hours=12),
name="average_amount_12h",
),
Aggregate(
input_column=Field("AMOUNT", Float64),
function="mean",
time_window=timedelta(hours=24),
name="average_amount_24h",
),
],
)
def user_transaction_amount_averages(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
The aggregation_interval
parameter defines the interval by which the window
advances. Tecton will update the feature value in the online store after the
aggregation interval has elapsed, assuming there was at least one event for that
key.
Continuous Window aggregations​
If you need streaming features that update within seconds (and not every
aggregation_interval
), you can use continuous time windows by setting
stream_processing_mode
to StreamProcessingMode.CONTINUOUS
:
from tecton import stream_feature_view, Aggregate, StreamProcessingMode
from tecton.types import Int32
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines a continuous streaming feature
# It counts the number of non-fraudulent transactions per user over a 1min, 5min and 1h time window
# The expected freshness for these features is ~5 seconds
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
stream_processing_mode=StreamProcessingMode.CONTINUOUS,
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("transaction", Int32), function="count", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("transaction", Int32), function="count", time_window=timedelta(minutes=30)),
Aggregate(input_column=Field("transaction", Int32), function="count", time_window=timedelta(hours=1)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Number of transactions a user has made recently",
)
def user_continuous_transaction_count(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
How to choose between continuous and sliding windows​
The stream_processing_mode
parameter defines how quickly Stream Feature Views
will process new events. The processing mode options are
StreamProcessingMode.TIME_INTERVAL
and StreamProcessingMode.CONTINUOUS
. The
default is StreamProcessingMode.TIME_INTERVAL
.
When to Use StreamProcessingMode.CONTINUOUS
​
You should use StreamProcessingMode.CONTINUOUS
if your model performance
depends on updating features within seconds. For example, it may be important
for a fraud detection use-case for features to include previous transactions
made even a few seconds prior.
When using continuous processing, Tecton will store all transformed events in the online store and run the full aggregations at the time of request.
If second-level feature freshness is not good enough for your use case, and you need millisecond-level freshness, please consider using Tecton's Stream Ingest API.
You should use the default StreamProcessingMode.TIME_INTERVAL
to optimize
costs if your model can tolerate features updating less quickly; with
StreamProcessingMode.TIME_INTERVAL
:
- Feature Views that use time-windowed aggregations can have a minimum interval of 1 minute
- Feature Views that do not use time-windowed aggregations will typically take 1 minute to update feature values
StreamProcessingMode.CONTINUOUS
may lead to higher infrastructure costs due to
more frequent feature writes and checkpointing updates. Using a longer
aggregation interval can have lower costs, especially if a single key may have
multiple events in a short period of time that can be grouped into a single
interval.
StreamProcessingMode.CONTINUOUS
on EMRStreamProcessingMode.CONTINUOUS
can cause significant S3 costs for customers
using Tecton with EMR due to the frequency of writing
Spark Streaming checkpoints
to S3. The Databricks implementation of Spark Streaming has much lower
checkpointing costs.
Productionizing a Stream​
For a Stream Feature View used in production where late data loss is
unacceptable, it is recommended to set the Stream Data Source
watermark_delay_threshold
to your stream retention period, or at least 24
hours. This will configure Spark Structured Streaming to not drop data in the
event that it processes the events late or out-of-order. The tradeoff of a
longer watermark delay is greater amount of in-memory state used by the
streaming job.
The ttl
(time-to-live) parameter​
The value of ttl
(a Stream Feature View parameter) affects the availability of
feature data in the online store, the generation of training feature data, and
the deletion of feature values from the online store.
For more details, see The ttl Parameter in Feature Views.