Skip to main content
Version: 0.8

โšก๏ธ Building Streaming Features

Real-time data can make all the difference for real-time models, but leveraging it can be quite the challenge.

With Tecton you can build millisecond-fresh features using plain Python and without any complex streaming infrastructure! Best of all, you can test it all locally and iterate in a notebook to quickly train better models that operate consistently online and offline.

note

This tutorial assumes some basic familiarity with Tecton. If you are new to Tecton, we recommend first checking out the Quickstart Tutorial which walks through an end-to-end journey of building a real-time ML application with Tecton.

tip

Most of this tutorial is intended to be run in a notebook. Some steps will explicitly note to run commands in your terminal.

Be sure to install the Tecton SDK before getting started. You will also need to install the following packages, used for reading data from S3:

pip install s3fs fsspec

In this tutorial we will:

  1. Create a streaming data source
  2. Define and test streaming features
  3. Query data online and offline

Before you start, run tecton login [you-org-account-name].tecton.ai in your CLI. Be sure to fill in your organization's Tecton account name.

Let's get started by running the code below!

import tecton
import pandas as pd
from datetime import datetime
from pprint import pprint

tecton.set_validation_mode("auto")

๐ŸŒŠ Create a Stream Source for ingesting real-time dataโ€‹

First, let's define a local Stream Source that supports ingesting real-time data. Once productionized, this will give us an online HTTP endpoint to push events to in real-time which Tecton will then transform into features for online inference.

As part of our Stream Source, we also register a historical log of the stream via the batch_config parameter. Tecton uses this historical log for backfills and offline development.

tip

Alternatively, you can have Tecton maintain this historical log for you! Simply add the log_offline=True parameter to the PushConfig and omit the batch_config. With this setup, Tecton will log all ingested events and use those to backfill any features that use this source.

from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64


transactions = StreamSource(
name="transactions",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)

๐Ÿ“Š Test the new Stream Sourceโ€‹

We can pull a range of offline data from a Stream Source's historical event log using get_dataframe().

start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)

df = transactions.get_dataframe(start, end).to_pandas()
display(df)
user_idtransaction_idcategoryamtis_fraudmerchantmerch_latmerch_longtimestamp
0user_8842403872423eb88afb219c9a10f5130d0b89a13451gas_transport68.230fraud_Kutch, Hermiston and Farrell42.71-78.33862023-06-20 10:26:41
1user_26851484496672e23b9193f97c2ba654854a66890432misc_pos32.980fraud_Lehner, Reichert and Mills39.1536-122.3642023-06-20 12:57:20
2user_722584453020db7a41ce2d16a4452c973418d9e544b1home4.50fraud_Koss, Hansen and Lueilwitz33.0332-105.7462023-06-20 14:49:59
3user_337750317412edfc42f7bc4b86d8c142acefb88c4565misc_pos7.680fraud_Buckridge PLC40.6828-88.80842023-06-20 14:50:13
4user_93438481188393d28b6d2e5afebf9c40304aa709ab29kids_pets68.971fraud_Lubowitz-Walter39.1443-96.1252023-06-20 15:55:09

๐Ÿ‘ฉโ€๐Ÿ’ป Define and test streaming features locallyโ€‹

Now that we have a Stream Source defined, we are ready to create some features.

Let's use this data source to create the following 3 features:

  • A user's total transaction amount in the last 1 minute
  • A user's total transaction amount in the last 1 hour
  • A user's total transaction amount in the last 30 days

To build these features, we will define a Stream Feature View that consumes from our transactions Stream Source.

The Stream Feature View transformation operates on events in a Pandas Dataframe and can do any arbitrary projections, filters, or expressions as needed. It's just Python!

info

The Python transformation runs before the aggregations so you can transform data as needed before it is aggregated.

from tecton import Entity, stream_feature_view, Aggregation
from datetime import datetime, timedelta


user = Entity(name="user", join_keys=["user_id"])


@stream_feature_view(
source=transactions,
entities=[user],
mode="pandas",
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
aggregations=[
Aggregation(function="sum", column="amt", time_window=timedelta(minutes=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(hours=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions):
return transactions[["user_id", "timestamp", "amt"]]

๐Ÿงช Test features interactivelyโ€‹

Now that we've defined and validated our Feature View, we can use get_historical_features to produce a range of feature values and check out the feature data.

info

These features are calculated against the Stream Source's historical event log.

start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)

df = user_transaction_amount_totals.get_historical_features(start_time=start, end_time=end).to_pandas()

display(df)
user_idamt_sum_1m_continuousamt_sum_1h_continuousamt_sum_30d_continuoustimestamp_effective_timestamp
0user_268308151877108.64108.64590.932022-01-01 18:08:242022-01-02 00:00:00
1user_26830815187791.7591.75682.682022-01-04 17:57:302022-01-05 00:00:00
2user_2683081518774.294.29686.972022-01-05 08:39:522022-01-06 00:00:00
3user_2683081518775.145.14682.772022-01-12 08:03:122022-01-13 00:00:00
4user_26830815187754.3954.39737.162022-01-12 18:05:312022-01-13 00:00:00

๐Ÿงฎ Generate training dataโ€‹

We can also include our new feature in a Feature Service and generate historical training data for a set of training events.

from tecton import FeatureService

fraud_detection_feature_service = FeatureService(
name="fraud_detection_feature_service", features=[user_transaction_amount_totals]
)

training_events = pd.read_parquet(
"s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq", storage_options={"anon": True}
)[["user_id", "timestamp", "amt", "is_fraud"]]

training_data = fraud_detection_feature_service.get_historical_features(training_events).to_pandas().fillna(0)
display(training_data)
user_idtimestampamtis_frauduser_transaction_amount_totals__amt_sum_1m_continuoususer_transaction_amount_totals__amt_sum_1h_continuoususer_transaction_amount_totals__amt_sum_30d_continuous
0user_6999551050852023-06-25 16:46:1054.51054.5154.51164.18
1user_6999551050852023-06-26 18:03:0910.76010.7610.76173.75
2user_6999551050852023-06-30 08:04:5815.62015.6215.62189.37
3user_6999551050852023-07-02 06:57:3341.21041.2141.21230.58
4user_6999551050852023-07-06 21:00:22694.060694.06694.06920.29
note

If you encounter the error Binder Error: Referenced column "__index_level_0__" not found in FROM clause! while using pandas.read_parquet, it may be due to an unexpected index column. To resolve this, drop the index before loading the DataFrame. Here's an example:

# Reset index and drop any default index column from the DataFrame
df[['user_id', 'event', 'timestamp']].reset_index(drop=True)

# Load the parquet file without setting an index column
df = pd.read_parquet('example.parquet', index_col=None)

๐Ÿš€ Apply our Stream Source and Stream Feature View to a Workspace.โ€‹

Once we are happy with our Stream Source and Stream Feature View we can copy the definitions into our Feature Repository and apply our changes to a production workspace using the Tecton CLI.

Note: The workspace must be a live workspace in order to push events to it.

tip

For more information on working with Feature Repositories or applying changes to workspaces, check out the Quick Start tutorial or Feature Development Workflow pages.

On our Feature View we've added four parameters to enable backfilling, online ingestion, and offline materialization to the Feature Store:

  • online=True
  • offline=True
  • feature_start_time=datetime(2023,5,1)
  • batch_schedule=timedelta(days=1)

feature_repo.py

from tecton import (
Entity,
BatchSource,
FileConfig,
stream_feature_view,
Aggregation,
StreamSource,
PushConfig,
FeatureService,
)
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


transactions = StreamSource(
name="transactions",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)

user = Entity(name="user", join_keys=["user_id"])


@stream_feature_view(
source=transactions,
entities=[user],
mode="pandas",
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
aggregations=[
Aggregation(function="sum", column="amt", time_window=timedelta(minutes=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(hours=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(days=30)),
],
online=True,
offline=True,
feature_start_time=datetime(2023, 5, 1),
batch_schedule=timedelta(days=1),
)
def user_transaction_amount_totals(transactions):
return transactions[["user_id", "timestamp", "amt"]]


fraud_detection_feature_service = FeatureService(
name="fraud_detection_feature_service", features=[user_transaction_amount_totals]
)

โœ… Run the following commands in your terminal to select a workspace and apply your changes:

tecton login [your-org-account-name].tecton.ai
tecton workspace select [my-live-workspace]
tecton apply

โšก๏ธ Ingest events and watch values update in real time!โ€‹

Now that our Stream Source has been productionised, we can start sending events to it and watch our aggregations update in real-time!

note

This step requires generating and setting a Tecton API key.

You can do so with the following commands:

tecton service-account create -n [my-service-account]

Next, give this service account edit permissions to your workspace to allow you to push events to your Stream Source. Copy the service account ID from the output of the step above.

tecton access-control assign-role -r editor -s [service-account-id] -w [my-live-workspace]

# Use your API key generated in the step above
TECTON_API_KEY = os.environ["TECTON_API_KEY"]
tecton.set_credentials(tecton_api_key=TECTON_API_KEY)

# Replace with your workspace name
ws = tecton.get_workspace("[my-live-workspace]")
transactions_stream_source = ws.get_data_source("transactions")
fraud_detection_feature_service = ws.get_feature_service("fraud_detection_feature_service")

โญ๏ธ Try repeatedly running these steps in quick succession and watch feature values update in real-time! โญ๏ธ

transactions_stream_source.ingest({"user_id": "user_123", "timestamp": datetime.utcnow(), "amt": 100}, dry_run=False)
features = fraud_detection_feature_service.get_online_features(join_keys={"user_id": "user_123"})

pprint(features.to_dict())

Out:

{'stream_user_transaction_amount_averages.amt_sum_1m_continuous': 100.0, 'stream_user_transaction_amount_averages.amt_sum_1h_continuous': 100.0, 'stream_user_transaction_amount_averages.amt_sum_30d_continuous': 100.0}

tip

The .ingest() method makes it easy to push events from a notebook. In production we recommend pushing events directly to the HTTP endpoint for the best performance.

The same goes for reading online data from a Feature Service via .get_online_features(). For best performance we recommend reading directly from the REST API or using our [Python Client Library]

โญ๏ธ Conclusionโ€‹

There you have it! Using nothing but Python and a local dev environment we were able to get real-time features running online and ready to consume by a production model.

Was this page helpful?