โก๏ธ Building Streaming Features
Click this button to open this tutorial in Google Colab and get started with zero setup:
Sign-up at tecton.ai/explore for a free account that lets you try out this tutorial and explore Tecton's Web UI.
Real-time data can make all the difference for real-time models, but leveraging it can be quite the challenge.
With Tecton you can build millisecond-fresh features using plain Python and without any complex streaming infrastructure! Best of all, you can test it all locally and iterate in a notebook to quickly train better models that operate consistently online and offline.
This tutorial assumes some basic familiarity with Tecton. If you are new to Tecton, we recommend first checking out Building a Production AI Application with Tecton which walks through an end-to-end journey of building a real-time ML application with Tecton.
Most of this tutorial is intended to be run in a notebook. Some steps will explicitly note to run commands in your terminal.
In this tutorial we will:
- Create a streaming data source
- Define and test streaming features
- Query data online and offline
โ๏ธ Install Pre-Reqsโ
First things first, let's install the Tecton SDK and other libraries used by this tutorial (we recommend in a virtual environment) using:
!pip install 'tecton[rift]==1.0.0' gcsfs s3fs -q
โ Log in to Tectonโ
Next we will authenticate with your organization's Tecton account and import libraries we will need.
For users that just signed up via explore.tecton.ai
you can leave this step as
is. If your organization has its own Tecton account, replace explore.tecton.ai
with your account url.
Note: You need to press enter
after pasting in your authentication code.
import tecton
import pandas as pd
from datetime import datetime
from pprint import pprint
import random, string
tecton.login("explore.tecton.ai") # replace with your org's URL if needed
tecton.conf.set("TECTON_OFFLINE_RETRIEVAL_COMPUTE_MODE", "rift")
Now we're ready to build!
๐ Create a Stream Source for ingesting real-time dataโ
First, let's define a local Stream Source that supports ingesting real-time data. Once productionized, this will give us an online HTTP endpoint to push events to in real-time which Tecton will then transform into features for online inference.
As part of our Stream Source, we also register a historical log of the stream
via the batch_config
parameter. Tecton uses this historical log for backfills
and offline development.
Alternatively, you can have Tecton maintain this historical log for you! Simply
add the log_offline=True
parameter to the PushConfig
and omit the
batch_config
. With this setup, Tecton will log all ingested events and use
those to backfill any features that use this source.
from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)
๐ Test the new Stream Sourceโ
We can pull a range of offline data from a Stream Source's historical event log
using get_dataframe()
.
start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)
df = transactions_stream.get_dataframe(start, end).to_pandas()
display(df.head(5))
index | timestamp | user_id | transaction_id | merchant | merch_lat | merch_long | amount |
---|---|---|---|---|---|---|---|
0 | 2023-05-01 00:38:11.716917 | user_5560069050 | 33dd0f5c6ece08ff84d10227e83a6936 | Mervyn's | "49.7518840" | "-140.759320" | 50.77 |
1 | 2023-05-01 00:45:03.031664 | user_8277121337 | 7b220dda23f7d8813062ad0f95c579c6 | Quality Stores | "-14.0234110" | "-121.107220" | 95.36 |
2 | 2023-05-01 00:53:40.791126 | user_4409718407 | ff22d24fef0164070cdae9771d8bf9c3 | Gottschalks | "3.7894785" | "53.168767" | 12.02 |
3 | 2023-05-01 01:18:38.115718 | user_6606710651 | e924834f85f588419f181e55cb61771d | Visionworks | "-14.0913895" | "-53.801625" | 866.82 |
4 | 2023-05-01 01:56:28.194464 | user_1200838555 | 38afd7d44a13c890ed5d3bbdf96d95a0 | Cook United | "82.5886295" | "-152.906522" | 0.04 |
๐ฉโ๐ป Define and test streaming features locallyโ
Now that we have a Stream Source defined, we are ready to create some features.
Let's use this data source to create the following 3 features:
- A user's total transaction amount in the last 1 minute
- A user's total transaction amount in the last 1 hour
- A user's total transaction amount in the last 30 days
To build these features, we will define a Stream Feature View that consumes from
our transactions
Stream Source.
The Stream Feature View transformation operates on events in a Pandas Dataframe and can do any arbitrary projections, filters, or expressions as needed. It's just Python!
The Python transformation runs before the aggregations so you can transform data as needed before it is aggregated.
from tecton import Entity, stream_feature_view, Aggregate
from tecton.types import Field, String, Float64
from datetime import datetime, timedelta
user = Entity(name="user", join_keys=[Field("user_id", String)])
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]
๐งช Test features interactivelyโ
Now that we've defined and validated our Feature View, we can use
get_features_in_range
to produce a range of feature values and check out the
feature data.
These features are calculated against the Stream Source's historical event log.
start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)
df = user_transaction_amount_totals.get_features_in_range(start_time=start, end_time=end).to_pandas()
display(df.head(5))
index | user_id | amount_sum_1m_continuous | amount_sum_1h_continuous | amount_sum_30d_continuous | _valid_to | _valid_from |
---|---|---|---|---|---|---|
0 | user_3400882595 | 0.0 | 11.6 | 534.4200000000001 | 2022-01-19 00:00:00 | 2022-01-18 00:00:00 |
1 | user_4411299629 | 0.0 | 259.5 | 259.5 | 2022-01-25 00:00:00 | 2022-01-24 00:00:00 |
2 | user_9031652113 | 0.0 | 41.22 | 41.22 | 2022-01-21 00:00:00 | 2022-01-20 00:00:00 |
3 | user_1117145948 | 0.0 | 0.0 | 5.08 | 2022-02-01 00:00:00 | 2022-01-03 00:00:00 |
4 | user_1120829659 | 0.0 | 0.0 | 125.65 | 2022-01-26 00:00:00 | 2022-01-22 00:00:00 |
๐งฎ Generate training dataโ
We can also include our new feature in a Feature Service and generate historical training data for a set of training events.
from tecton import FeatureService
fraud_detection_feature_service_streaming = FeatureService(
name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)
# Retrieve our dataset of historical transaction data
transactions_df = pd.read_parquet("s3://tecton.ai.public/tutorials/transactions.pq", storage_options={"anon": True})
# Retrieve our dataset of labels containing transaction_id and is_fraud (set to 1 if the transaction is fraudulent or 0 otherwise)
training_labels = pd.read_parquet("s3://tecton.ai.public/tutorials/labels.pq", storage_options={"anon": True})
# Join our label dataset to our transaction data to produce a list of training events
training_events = training_labels.merge(transactions_df, on=["transaction_id"], how="left")[
["user_id", "timestamp", "amount", "is_fraud"]
]
# Pass our training events into Tecton to generate point-in-time correct training data
training_data = fraud_detection_feature_service_streaming.get_features_for_events(training_events).to_pandas().fillna(0)
display(training_data.sample(5))
index | user_id | timestamp | is_fraud | amount | user_transaction_amount_totals__amount_sum_30d_continuous | user_transaction_amount_totals__amount_sum_1m_continuous | user_transaction_amount_totals__amount_sum_1h_continuous |
---|---|---|---|---|---|---|---|
0 | user_1028747636 | 2021-01-03 08:42:43.668406 | 0 | 0.0 | 77.09 | 77.09 | 77.09 |
1 | user_1155940157 | 2021-01-21 03:27:42.566411 | 0 | 0.0 | 43.01 | 43.01 | 43.01 |
2 | user_1567708646 | 2021-01-20 13:57:14.832615 | 0 | 0.0 | 536.1 | 536.1 | 536.1 |
3 | user_1567708646 | 2021-01-21 18:13:41.535067 | 0 | 0.0 | 608.26 | 72.16 | 72.16 |
4 | user_1755385063 | 2021-01-05 04:19:08.782106 | 0 | 0.0 | 96.84 | 96.84 | 96.84 |
If you encounter the error
Binder Error: Referenced column "__index_level_0__" not found in FROM clause!
while using pandas.read_parquet
, it may be due to an unexpected index column.
To resolve this, drop the index before loading the DataFrame. Here's an example:
# Reset index and drop any default index column from the DataFrame
df[['user_id', 'event', 'timestamp']].reset_index(drop=True)
# Load the parquet file without setting an index column
df = pd.read_parquet('example.parquet', index_col=None)
๐ Apply our Stream Source and Stream Feature View to a Workspace.โ
This section requires your organization to have its own Tecton account. But
don't fret! If you are a user of explore.tecton.ai
, we've done these steps for
you. You can read through it and continue with the rest of the tutorial, picking
back up at the "Ingest events and watch values update in real time!" section.
If you want to productionize your own features with your own data, you can sign up for an unrestricted free trial at tecton.ai/free-trial.
Once we are happy with our Stream Source and Stream Feature View we can copy the definitions into our Feature Repository and apply our changes to a live workspace using the Tecton CLI.
Note: The workspace must be a live workspace in order to push events to it.
For more information on working with Feature Repositories or applying changes to workspaces, check out the Quick Start tutorial or Feature Development Workflow pages.
On our Feature View we've added four parameters to enable backfilling, online ingestion, and offline materialization to the Feature Store:
online=True
offline=True
feature_start_time=datetime(2020, 1, 1)
batch_schedule=timedelta(days=1)
feature_repo.py
from tecton import (
Entity,
BatchSource,
FileConfig,
stream_feature_view,
Aggregate,
StreamSource,
PushConfig,
FeatureService,
)
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)
user = Entity(name="user", join_keys=[Field("user_id", String)])
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
online=True,
offline=True,
feature_start_time=datetime(2020, 1, 1),
batch_schedule=timedelta(days=1),
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]
fraud_detection_feature_service_streaming = FeatureService(
name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)
โ Run the following commands in your terminal to select a workspace and apply your changes:
tecton login [your-org-account-name].tecton.ai
tecton workspace select [my-live-workspace]
tecton apply
โก๏ธ Ingest events and watch values update in real time!โ
Now that our Stream Source has been productionised, we can start sending events to it and watch our aggregations update in real-time!
This step requires generating and setting a Tecton API key.
To do this, you will need to create a new Service Account and give it access to read features from your workspace.
โ Head to the following URL to create a new service account (replace "explore" with your organization's account name in the URL as necessary). Be sure to save the API key!
โ
If you are using explore.tecton.ai
, this account will automatically be
given the necessary privileges to ingest stream events in the "prod" workspace.
Otherwise, you should give the service account access to read features from your
newly created workspace by following these steps:
- Navigate to the Service Account page by clicking on your new service account in the list at the URL above
- Click on "Assign Workspace Access"
- Select your workspace and give the service account the "Editor" role
โ
Copy the generated API key into the code snippet below where it says
your-api-key
. Also be sure to replace the workspace name with your newly
created workspace name if necessary.
Be sure to replace your-api-key
with the key you generated above.
# Use your API key generated in the step above
TECTON_API_KEY = "your-api-key" # replace with your API key
WORKSPACE_NAME = "prod" # replace with your new workspace name if needed
tecton.set_credentials(tecton_api_key=TECTON_API_KEY)
ws = tecton.get_workspace(WORKSPACE_NAME)
transactions_stream_source = ws.get_data_source("transactions_stream")
fraud_detection_feature_service_streaming = ws.get_feature_service("fraud_detection_feature_service_streaming")
# Generate a random user_id for the next step
user_id = "".join(random.choices(string.ascii_letters + string.digits, k=10))
โญ๏ธ Try repeatedly running these steps in quick succession and watch feature values update in real-time!โญ๏ธ
You can try changing the transaction amount in the first step.
Service account permissions may take a few minutes to update. Also, your first ingestion call may take longer than the rest.
# Ingest events
try:
response = transactions_stream_source.ingest({"user_id": user_id, "timestamp": datetime.utcnow(), "amount": 100})
pprint(response)
except Exception as e:
print(
"Error: Your API key permissions may not yet have updated, or perhaps you didn't set the right API key and workspace name above.\n",
e,
)
# Read updated feature values
try:
features = fraud_detection_feature_service_streaming.get_online_features(join_keys={"user_id": user_id})
pprint(features.to_dict())
except Exception as e:
print(
"Error: Your API key permissions may not yet have updated, or perhaps you didn't set the right API key and workspace name above.\n",
e,
)
Out:
{'stream_user_transaction_amount_averages.amount_sum_1m_continuous': 100.0, 'stream_user_transaction_amount_averages.amount_sum_1h_continuous': 100.0, 'stream_user_transaction_amount_averages.amount_sum_30d_continuous': 100.0}
The .ingest()
method makes it easy to push events from a notebook. In
production we recommend pushing events directly to the HTTP endpoint for the
best performance.
The same goes for reading online data from a Feature Service via
.get_online_features()
. For best performance we recommend reading directly
from the HTTP API or using our
Python Client Library.
โญ๏ธ Conclusionโ
There you have it! Using nothing but Python and a local dev environment we were able to get real-time features running online and ready to consume by a production model.
Next up, check out how Tecton lets you build powerful features that can only be calculated at the time of a prediction in the Building Realtime Features tutorial.