Stream Feature View
Stream Feature Views (SFV) compute feature values from a continuous streaming data source. They support the near real-time calculation of features with a freshness of less than 1 second.
Stream Feature Views also support point-in-time correct training data generation, as well as backfills of newly created features, from a historical offline log of event data.
Streaming features are commonly used in supporting time-sensitive ML use cases like fraud detection, personalization, and ad targeting, because streaming events are highly likely to be a critical part of immediately detecting behavioral changes.
Unlike batch processing, dealing with streaming data requires a different approach to aggregation and compute. Tecton offers two different compute engines specifically for Streaming Features.
See Feature Naming Requirements for detailed naming rules and examples.
- Rift: is Tecton's native compute engine, providing python-based computation that is directly managed by Tecton. Records are sent to an ingestion API and are optionally transformed in Python and then written directly to the Feature Store. This approach is the right choice if you prefer the simple experience of Python-based transformation environments, want to ingest pre-computed features, are building an event-driven architecture, or simply if you need very fresh features and every millisecond counts.
- Spark Structured Streaming: records are read from your streaming source (Tecton supports both Kafka and Kinesis), transformed, and written to the Feature Store by a Spark Structured Streaming job in your data plane. Spark Streaming features may be the right choice if your existing data stack already heavily relies on Spark and Spark Structured Streaming or if you have very high throughput (> 1k records per second).
Batch Sources for Stream Feature Viewsโ
Features based on streaming data can be hard to develop, test, and productionize, because stream retention on the data source is shorter than the lookback window required by the feature.
Tecton's solution is to leverage a complimentary Batch Source containing historical data (often a log or materialized table mirroring the stream). Tecton can rely on that source to compute historical values:
- In Notebook Development, Tecton uses the batch source to compute historical feature values, so a Data Scientist can develop, test, and iterate on feature definitions without orchestrating their own backfills.
- When deploying a Stream Feature View, Tecton automatically runs batch
batchfill jobs to populate the online store with accurate values up to the
feature_start_time. This ensures that features are immediately available and correct, without having to wait for data to accumulate from the Stream Source.
After a feature has been productionized, the Stream Source takes over, processing new events in near real-time and updating feature values incrementally.
Example Implementationโ
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]
Stream Feature View Workflowโ
To use a Stream Feature View:
- Define an input source.
- Next, and optionally, transform the stream data.
- Apply aggregations to the data to update relevant features.
Tecton will immediately and incrementally update features in the online store, and ptionally, update features in the offline store according to the update interval.
What's Nextโ
Now that you understand and can implement Streaming Feature Views, consider exploring these topics next:
- Realtime Feature Views for request-time feature computation.
- Stream Ingest API to push events via HTTP.
- Monitoring Streaming Jobs to track data freshness and errors.
- Testing Stream Features for validating behavior in dev.
- Create a Feature Service to serve the features defined in your Feature Views.
As you continue developing your Feature Views, learn about employing different types of feature design patterns and working with the various Feature types like calculation and aggregation features, and learning more about how transformations and embeddings work in Tecton.
Want to deploy streaming features into production? Learn more in Deploying Features to Production.
How to Use Streaming Feature Viewsโ
Define a Streaming Feature Viewโ
- Create a
StreamSourcethat defines both your live stream and historical backfill source - Define your streaming logic using
@stream_feature_view - Apply the Feature View with the Tecton CLI:
tecton apply
Use in a Feature Serviceโ
Include Stream Feature Views in a FeatureService to serve features.
from tecton import FeatureService
fraud_detection = FeatureService(name="fraud_detection", features=[user_transaction_amount_totals])
Test with Historical Dataโ
Test your Stream Feature View with historical data to verify that its transformation logic produces correct results over a realistic sequence of input events.
start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)
df = user_transaction_amount_totals.get_features_in_range(start_time=start, end_time=end).to_pandas()
df.head()
Real-Time Feature Servingโ
To simulate live feature updates and check the online store:
# Ingest a new record
record = ds.ingest({"user_id": "user_123", "timestamp": datetime.utcnow(), "amount": 100.0})
# Retrieve updated features in real-time
features = fv.get_online_features(join_keys={"user_id": "user_123"}).to_dict()
Supported Compute Modesโ
| Mode | Description | Supported Engines |
|---|---|---|
"pandas" | Lightweight Python transformations | Rift |
"spark_sql" | Scalable Spark transformations | EMR, Databricks |
"bigquery_sql" | Push-down streaming logic in BigQuery | Rift |
"snowflake_sql" | Push-down logic in Snowflake | Rift |
Example Implementationsโ
- Rift (pandas)
- Spark (spark_sql)
from tecton import StreamSource, stream_feature_view, Aggregate, Entity
from tecton.types import Field, String, Float64, Timestamp
from datetime import timedelta, datetime
from tecton import PushConfig, FileConfig
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("amount", Float64),
],
)
user = Entity(name="user", join_keys=[Field("user_id", String)])
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]
from tecton import stream_feature_view, Aggregate, Entity
from tecton.types import Field, String, Float64, Timestamp
from datetime import timedelta, datetime
from fraud.data_sources import transactions_stream
from fraud.entities import user
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
timestamp_field="timestamp",
batch_schedule=timedelta(minutes=10),
stream_compute=SparkStreamConfig(
trigger_interval=timedelta(minutes=1),
stateful_processing_enabled=True,
),
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
)
def user_transaction_amount_totals(transactions_stream):
return f"""
SELECT
user_id,
timestamp,
amount
FROM
{transactions_stream}
"""
| Parameter | Description |
|---|---|
source | A StreamSource object that defines the stream input and optional backfill |
entities | Entity definitions for joining (e.g., user_id) |
timestamp_field | Event timestamp for feature alignment |
features | List of Aggregate or Attribute definitions |
aggregation_leading_edge | Determines event alignment strategy (e.g. LATEST_EVENT_TIME) |
mode | Execution engine: "pandas", "spark_sql", etc. |
ttl | Time-to-live for online feature values |
feature_start_time | Earliest time from which features should be computed |