โก๏ธ Building Streaming Features
Real-time data can make all the difference for real-time models, but leveraging it can be quite the challenge.
With Tecton you can build millisecond-fresh features using plain Python and without any complex streaming infrastructure! Best of all, you can test it all locally and iterate in a notebook to quickly train better models that operate consistently online and offline.
This tutorial assumes some basic familiarity with Tecton. If you are new to Tecton, we recommend first checking out the Quickstart Tutorial which walks through an end-to-end journey of building a real-time ML application with Tecton.
Most of this tutorial is intended to be run in a notebook. Some steps will explicitly note to run commands in your terminal.
Be sure to install the Tecton SDK before getting started. You will also need to install the following packages, used for reading data from S3:
pip install s3fs fsspec
In this tutorial we will:
- Create a streaming data source
- Define and test streaming features
- Query data online and offline
Before you start, run tecton login [you-org-account-name].tecton.ai
in your
CLI. Be sure to fill in your organization's Tecton account name.
Let's get started by running the code below!
import tecton
import pandas as pd
from datetime import datetime
from pprint import pprint
tecton.set_validation_mode("auto")
๐ Create a Stream Source for ingesting real-time dataโ
First, let's define a local Stream Source that supports ingesting real-time data. Once productionized, this will give us an online HTTP endpoint to push events to in real-time which Tecton will then transform into features for online inference.
As part of our Stream Source, we also register a historical log of the stream
via the batch_config
parameter. Tecton uses this historical log for backfills
and offline development.
Alternatively, you can have Tecton maintain this historical log for you! Simply
add the log_offline=True
parameter to the PushConfig
and omit the
batch_config
. With this setup, Tecton will log all ingested events and use
those to backfill any features that use this source.
from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
๐ Test the new Stream Sourceโ
We can pull a range of offline data from a Stream Source's historical event log
using get_dataframe()
.
start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)
df = transactions.get_dataframe(start, end).to_pandas()
display(df)
user_id | transaction_id | category | amt | is_fraud | merchant | merch_lat | merch_long | timestamp | |
---|---|---|---|---|---|---|---|---|---|
0 | user_884240387242 | 3eb88afb219c9a10f5130d0b89a13451 | gas_transport | 68.23 | 0 | fraud_Kutch, Hermiston and Farrell | 42.71 | -78.3386 | 2023-06-20 10:26:41 |
1 | user_268514844966 | 72e23b9193f97c2ba654854a66890432 | misc_pos | 32.98 | 0 | fraud_Lehner, Reichert and Mills | 39.1536 | -122.364 | 2023-06-20 12:57:20 |
2 | user_722584453020 | db7a41ce2d16a4452c973418d9e544b1 | home | 4.5 | 0 | fraud_Koss, Hansen and Lueilwitz | 33.0332 | -105.746 | 2023-06-20 14:49:59 |
3 | user_337750317412 | edfc42f7bc4b86d8c142acefb88c4565 | misc_pos | 7.68 | 0 | fraud_Buckridge PLC | 40.6828 | -88.8084 | 2023-06-20 14:50:13 |
4 | user_934384811883 | 93d28b6d2e5afebf9c40304aa709ab29 | kids_pets | 68.97 | 1 | fraud_Lubowitz-Walter | 39.1443 | -96.125 | 2023-06-20 15:55:09 |
๐ฉโ๐ป Define and test streaming features locallyโ
Now that we have a Stream Source defined, we are ready to create some features.
Let's use this data source to create the following 3 features:
- A user's total transaction amount in the last 1 minute
- A user's total transaction amount in the last 1 hour
- A user's total transaction amount in the last 30 days
To build these features, we will define a Stream Feature View that consumes from
our transactions
Stream Source.
The Stream Feature View transformation operates on events in a Pandas Dataframe and can do any arbitrary projections, filters, or expressions as needed. It's just Python!
The Python transformation runs before the aggregations so you can transform data as needed before it is aggregated.
from tecton import Entity, stream_feature_view, Aggregation
from datetime import datetime, timedelta
user = Entity(name="user", join_keys=["user_id"])
@stream_feature_view(
source=transactions,
entities=[user],
mode="pandas",
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
aggregations=[
Aggregation(function="sum", column="amt", time_window=timedelta(minutes=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(hours=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions):
return transactions[["user_id", "timestamp", "amt"]]
๐งช Test features interactivelyโ
Now that we've defined and validated our Feature View, we can use
get_historical_features
to produce a range of feature values and check out the
feature data.
These features are calculated against the Stream Source's historical event log.
start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)
df = user_transaction_amount_totals.get_historical_features(start_time=start, end_time=end).to_pandas()
display(df)
user_id | amt_sum_1m_continuous | amt_sum_1h_continuous | amt_sum_30d_continuous | timestamp | _effective_timestamp | |
---|---|---|---|---|---|---|
0 | user_268308151877 | 108.64 | 108.64 | 590.93 | 2022-01-01 18:08:24 | 2022-01-02 00:00:00 |
1 | user_268308151877 | 91.75 | 91.75 | 682.68 | 2022-01-04 17:57:30 | 2022-01-05 00:00:00 |
2 | user_268308151877 | 4.29 | 4.29 | 686.97 | 2022-01-05 08:39:52 | 2022-01-06 00:00:00 |
3 | user_268308151877 | 5.14 | 5.14 | 682.77 | 2022-01-12 08:03:12 | 2022-01-13 00:00:00 |
4 | user_268308151877 | 54.39 | 54.39 | 737.16 | 2022-01-12 18:05:31 | 2022-01-13 00:00:00 |
๐งฎ Generate training dataโ
We can also include our new feature in a Feature Service and generate historical training data for a set of training events.
from tecton import FeatureService
fraud_detection_feature_service = FeatureService(
name="fraud_detection_feature_service", features=[user_transaction_amount_totals]
)
training_events = pd.read_parquet(
"s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq", storage_options={"anon": True}
)[["user_id", "timestamp", "amt", "is_fraud"]]
training_data = fraud_detection_feature_service.get_historical_features(training_events).to_pandas().fillna(0)
display(training_data)
user_id | timestamp | amt | is_fraud | user_transaction_amount_totals__amt_sum_1m_continuous | user_transaction_amount_totals__amt_sum_1h_continuous | user_transaction_amount_totals__amt_sum_30d_continuous | |
---|---|---|---|---|---|---|---|
0 | user_699955105085 | 2023-06-25 16:46:10 | 54.51 | 0 | 54.51 | 54.51 | 164.18 |
1 | user_699955105085 | 2023-06-26 18:03:09 | 10.76 | 0 | 10.76 | 10.76 | 173.75 |
2 | user_699955105085 | 2023-06-30 08:04:58 | 15.62 | 0 | 15.62 | 15.62 | 189.37 |
3 | user_699955105085 | 2023-07-02 06:57:33 | 41.21 | 0 | 41.21 | 41.21 | 230.58 |
4 | user_699955105085 | 2023-07-06 21:00:22 | 694.06 | 0 | 694.06 | 694.06 | 920.29 |
If you encounter the error
Binder Error: Referenced column "__index_level_0__" not found in FROM clause!
while using pandas.read_parquet
, it may be due to an unexpected index column.
To resolve this, drop the index before loading the DataFrame. Here's an example:
# Reset index and drop any default index column from the DataFrame
df[['user_id', 'event', 'timestamp']].reset_index(drop=True)
# Load the parquet file without setting an index column
df = pd.read_parquet('example.parquet', index_col=None)
๐ Apply our Stream Source and Stream Feature View to a Workspace.โ
Once we are happy with our Stream Source and Stream Feature View we can copy the definitions into our Feature Repository and apply our changes to a production workspace using the Tecton CLI.
Note: The workspace must be a live workspace in order to push events to it.
For more information on working with Feature Repositories or applying changes to workspaces, check out the Quick Start tutorial or Feature Development Workflow pages.
On our Feature View we've added four parameters to enable backfilling, online ingestion, and offline materialization to the Feature Store:
online=True
offline=True
feature_start_time=datetime(2023,5,1)
batch_schedule=timedelta(days=1)
feature_repo.py
from tecton import (
Entity,
BatchSource,
FileConfig,
stream_feature_view,
Aggregation,
StreamSource,
PushConfig,
FeatureService,
)
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
user = Entity(name="user", join_keys=["user_id"])
@stream_feature_view(
source=transactions,
entities=[user],
mode="pandas",
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
aggregations=[
Aggregation(function="sum", column="amt", time_window=timedelta(minutes=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(hours=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(days=30)),
],
online=True,
offline=True,
feature_start_time=datetime(2023, 5, 1),
batch_schedule=timedelta(days=1),
)
def user_transaction_amount_totals(transactions):
return transactions[["user_id", "timestamp", "amt"]]
fraud_detection_feature_service = FeatureService(
name="fraud_detection_feature_service", features=[user_transaction_amount_totals]
)
โ Run the following commands in your terminal to select a workspace and apply your changes:
tecton login [your-org-account-name].tecton.ai
tecton workspace select [my-live-workspace]
tecton apply
โก๏ธ Ingest events and watch values update in real time!โ
Now that our Stream Source has been productionised, we can start sending events to it and watch our aggregations update in real-time!
This step requires generating and setting a Tecton API key.
You can do so with the following commands:
tecton service-account create -n [my-service-account]
Next, give this service account edit permissions to your workspace to allow you to push events to your Stream Source. Copy the service account ID from the output of the step above.
tecton access-control assign-role -r editor -s [service-account-id] -w [my-live-workspace]
# Use your API key generated in the step above
TECTON_API_KEY = os.environ["TECTON_API_KEY"]
tecton.set_credentials(tecton_api_key=TECTON_API_KEY)
# Replace with your workspace name
ws = tecton.get_workspace("[my-live-workspace]")
transactions_stream_source = ws.get_data_source("transactions")
fraud_detection_feature_service = ws.get_feature_service("fraud_detection_feature_service")
โญ๏ธ Try repeatedly running these steps in quick succession and watch feature values update in real-time! โญ๏ธ
transactions_stream_source.ingest({"user_id": "user_123", "timestamp": datetime.utcnow(), "amt": 100}, dry_run=False)
features = fraud_detection_feature_service.get_online_features(join_keys={"user_id": "user_123"})
pprint(features.to_dict())
Out:
{'stream_user_transaction_amount_averages.amt_sum_1m_continuous': 100.0, 'stream_user_transaction_amount_averages.amt_sum_1h_continuous': 100.0, 'stream_user_transaction_amount_averages.amt_sum_30d_continuous': 100.0}
The .ingest()
method makes it easy to push events from a notebook. In
production we recommend pushing events directly to the HTTP endpoint for the
best performance.
The same goes for reading online data from a Feature Service via
.get_online_features()
. For best performance we recommend reading directly
from the REST API or using our [Python Client Library]
โญ๏ธ Conclusionโ
There you have it! Using nothing but Python and a local dev environment we were able to get real-time features running online and ready to consume by a production model.