Skip to main content
Version: 1.0

โšก๏ธ Building Streaming Features

Click this button to open this tutorial in Google Colab and get started with zero setup:

Open In Colab

Not yet a Tecton user?

Sign-up at explore.tecton.ai for a free account that lets you try out this tutorial and explore Tecton's Web UI.

Real-time data can make all the difference for real-time models, but leveraging it can be quite the challenge.

With Tecton you can build millisecond fresh features using plain Python and without any complex streaming infrastructure! Best of all, you can test it all locally and iterate in a notebook to quickly train better models that operate consistently online and offline.

note

This tutorial assumes some basic familiarity with Tecton. If you are new to Tecton, we recommend first checking out Building a Production AI Application with Tecton which walks through an end-to-end journey of building a real-time ML application with Tecton.

Most of this tutorial is intended to be run in a notebook. Some steps will explicitly note to run commands in your terminal.

In this tutorial we will:

  1. Create a streaming data source
  2. Define and test streaming features
  3. Query data online and offline

โš™๏ธ Install Pre-Reqsโ€‹

First things first, let's install the Tecton SDK and other libraries used by this tutorial (we recommend in a virtual environment) using:

!pip install 'tecton[rift]==1.0.0' gcsfs s3fs -q

โœ… Log in to Tectonโ€‹

Next we will authenticate with your organization's Tecton account and import libraries we will need.

For users that just signed up via explore.tecton.ai you can leave this step as is. If your organization has its own Tecton account, replace explore.tecton.ai with your account url.

Note: You need to press enter after pasting in your authentication code.

import tecton
import pandas as pd
from datetime import datetime
from pprint import pprint
import random, string

tecton.login("explore.tecton.ai") # replace with your URL

tecton.conf.set("TECTON_OFFLINE_RETRIEVAL_COMPUTE_MODE", "rift")

Now we're ready to build!

๐ŸŒŠ Create a Stream Source for ingesting real-time dataโ€‹

First, let's define a local Stream Source that supports ingesting real-time data. Once productionized, this will give us an online HTTP endpoint to push events to in real-time which Tecton will then transform into features for online inference.

As part of our Stream Source, we also register a historical log of the stream via the batch_config parameter. Tecton uses this historical log for backfills and offline development.

tip

Alternatively, you can have Tecton maintain this historical log for you! Simply add the log_offline=True parameter to the PushConfig and omit the batch_config. With this setup, Tecton will log all ingested events and use those to backfill any features that use this source.

from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64


transactions_stream = StreamSource(
name="transactions_stream",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)

๐Ÿ“Š Test the new Stream Sourceโ€‹

We can pull a range of offline data from a Stream Source's historical event log using get_dataframe().

start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)

df = transactions_stream.get_dataframe(start, end).to_pandas()
display(df.head(5))
indextimestampuser_idtransaction_idmerchantmerch_latmerch_longamount
02023-05-01 00:38:11.716917user_556006905033dd0f5c6ece08ff84d10227e83a6936Mervyn's"49.7518840""-140.759320"50.77
12023-05-01 00:45:03.031664user_82771213377b220dda23f7d8813062ad0f95c579c6Quality Stores"-14.0234110""-121.107220"95.36
22023-05-01 00:53:40.791126user_4409718407ff22d24fef0164070cdae9771d8bf9c3Gottschalks"3.7894785""53.168767"12.02
32023-05-01 01:18:38.115718user_6606710651e924834f85f588419f181e55cb61771dVisionworks"-14.0913895""-53.801625"866.82
42023-05-01 01:56:28.194464user_120083855538afd7d44a13c890ed5d3bbdf96d95a0Cook United"82.5886295""-152.906522"0.04

๐Ÿ‘ฉโ€๐Ÿ’ป Define and test streaming features locallyโ€‹

Now that we have a Stream Source defined, we are ready to create some features.

Let's use this data source to create the following 3 features:

  • A user's total transaction amount in the last 1 minute
  • A user's total transaction amount in the last 1 hour
  • A user's total transaction amount in the last 30 days

To build these features, we will define a Stream Feature View that consumes from our transactions Stream Source.

The Stream Feature View transformation operates on events in a Pandas Dataframe and can do any arbitrary projections, filters, or expressions as needed. It's just Python!

info

The Python transformation runs before the aggregations so you can transform data as needed before it is aggregated.

from tecton import Entity, stream_feature_view, Aggregate
from tecton.types import Field, String, Float64
from datetime import datetime, timedelta


user = Entity(name="user", join_keys=[Field("user_id", String)])


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]

๐Ÿงช Test features interactivelyโ€‹

Now that we've defined and validated our Feature View, we can use get_features_in_range to produce a range of feature values and check out the feature data.

info

These features are calculated against the Stream Source's historical event log.

start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)

df = user_transaction_amount_totals.get_features_in_range(start_time=start, end_time=end).to_pandas()

display(df.head(5))
indexuser_idamount_sum_1m_continuousamount_sum_1h_continuousamount_sum_30d_continuous_valid_to_valid_from
0user_34008825950.011.6534.42000000000012022-01-19 00:00:002022-01-18 00:00:00
1user_44112996290.0259.5259.52022-01-25 00:00:002022-01-24 00:00:00
2user_90316521130.041.2241.222022-01-21 00:00:002022-01-20 00:00:00
3user_11171459480.00.05.082022-02-01 00:00:002022-01-03 00:00:00
4user_11208296590.00.0125.652022-01-26 00:00:002022-01-22 00:00:00

๐Ÿงฎ Generate training dataโ€‹

We can also include our new feature in a Feature Service and generate historical training data for a set of training events.

from tecton import FeatureService

fraud_detection_feature_service_streaming = FeatureService(
name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)

# Retrieve our dataset of historical transaction data
transactions_df = pd.read_parquet("s3://tecton.ai.public/tutorials/transactions.pq", storage_options={"anon": True})

# Retrieve our dataset of labels containing transaction_id and is_fraud (set to 1 if the transaction is fraudulent or 0 otherwise)
training_labels = pd.read_parquet("s3://tecton.ai.public/tutorials/labels.pq", storage_options={"anon": True})

# Join our label dataset to our transaction data to produce a list of training events
training_events = training_labels.merge(transactions_df, on=["transaction_id"], how="left")[
["user_id", "timestamp", "amount", "is_fraud"]
]

# Pass our training events into Tecton to generate point-in-time correct training data
training_data = fraud_detection_feature_service_streaming.get_features_for_events(training_events).to_pandas().fillna(0)
display(training_data.sample(5))
indexuser_idtimestampis_fraudamountuser_transaction_amount_totals__amount_sum_30d_continuoususer_transaction_amount_totals__amount_sum_1m_continuoususer_transaction_amount_totals__amount_sum_1h_continuous
0user_10287476362021-01-03 08:42:43.66840600.077.0977.0977.09
1user_11559401572021-01-21 03:27:42.56641100.043.0143.0143.01
2user_15677086462021-01-20 13:57:14.83261500.0536.1536.1536.1
3user_15677086462021-01-21 18:13:41.53506700.0608.2672.1672.16
4user_17553850632021-01-05 04:19:08.78210600.096.8496.8496.84

๐Ÿš€ Apply our Stream Source and Stream Feature View to a Workspace.โ€‹

Heads Up!

This section requires your organization to have its own Tecton account. But don't fret! If you are a user of explore.tecton.ai, we've done these steps for you. You can read through it and continue with the rest of the tutorial, picking back up at the "Ingest events and watch values update in real time!" section.

If you want to productionize your own features with your own data, you can sign up for an unrestricted free trial at tecton.ai/free-trial.

Once we are happy with our Stream Source and Stream Feature View we can copy the definitions into our Feature Repository and apply our changes to a live workspace using the Tecton CLI.

Note: The workspace must be a live workspace in order to push events to it.

tip

For more information on working with Feature Repositories or applying changes to workspaces, check out the Quick Start tutorial or Feature Development Workflow pages.

On our Feature View we've added four parameters to enable backfilling, online ingestion, and offline materialization to the Feature Store:

  • online=True
  • offline=True
  • feature_start_time=datetime(2020, 1, 1)
  • batch_schedule=timedelta(days=1)

feature_repo.py

from tecton import (
Entity,
BatchSource,
FileConfig,
stream_feature_view,
Aggregate,
StreamSource,
PushConfig,
FeatureService,
)
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


transactions_stream = StreamSource(
name="transactions_stream",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)

user = Entity(name="user", join_keys=[Field("user_id", String)])


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
online=True,
offline=True,
feature_start_time=datetime(2020, 1, 1),
batch_schedule=timedelta(days=1),
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]


fraud_detection_feature_service_streaming = FeatureService(
name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)

โœ… Run the following commands in your terminal to select a workspace and apply your changes:

tecton login [your-org-account-name].tecton.ai
tecton workspace select [my-live-workspace]
tecton apply

โšก๏ธ Ingest events and watch values update in real time!โ€‹

Now that our Stream Source has been productionised, we can start sending events to it and watch our aggregations update in real-time!

note

This step requires generating and setting a Tecton API key.

To do this, you will need to create a new Service Account and give it access to read features from your workspace.

โœ… Head to the following URL to create a new service account (replace "explore" with your organization's account name in the URL as necessary). Be sure to save the API key!

https://explore.tecton.ai/app/settings/accounts-and-access/service-accounts?create-service-account=true

โœ… If you are using explore.tecton.ai, this account will automatically be given the necessary privileges to ingest stream events in the "prod" workspace. Otherwise, you should give the service account access to read features from your newly created workspace by following these steps:

  1. Navigate to the Service Account page by clicking on your new service account in the list at the URL above
  2. Click on "Assign Workspace Access"
  3. Select your workspace and give the service account the "Editor" role

โœ… Copy the generated API key into the code snippet below where it says your-api-key. Also be sure to replace the workspace name with your newly created workspace name if necessary.

Be sure to replace your-api-key with the key you generated above.

# Use your API key generated in the step above
TECTON_API_KEY = "your-api-key" # replace with your API key
WORKSPACE_NAME = "prod" # replace with your new workspace name if needed

tecton.set_credentials(tecton_api_key=TECTON_API_KEY)

ws = tecton.get_workspace(WORKSPACE_NAME)
transactions_stream_source = ws.get_data_source("transactions_stream")
fraud_detection_feature_service_streaming = ws.get_feature_service("fraud_detection_feature_service_streaming")

# Generate a random user_id for the next step
user_id = "".join(random.choices(string.ascii_letters + string.digits, k=10))

โญ๏ธ Try repeatedly running these steps in quick succession and watch feature values update in real-time!โญ๏ธ

You can try changing the transaction amount in the first step.

note

Service account permissions may take a few minutes to update. Also, your first ingestion call may take longer than the rest.

# Ingest events
try:
response = transactions_stream_source.ingest({"user_id": user_id, "timestamp": datetime.utcnow(), "amount": 100})
pprint(response)

except Exception as e:
print(
"Error: Your API key permissions may not yet have updated, or perhaps you didn't set the right API key and workspace name above.\n",
e,
)
# Read updated feature values
try:
features = fraud_detection_feature_service_streaming.get_online_features(join_keys={"user_id": user_id})
pprint(features.to_dict())

except Exception as e:
print(
"Error: Your API key permissions may not yet have updated, or perhaps you didn't set the right API key and workspace name above.\n",
e,
)

{'stream_user_transaction_amount_averages.amount_sum_1m_continuous': 100.0, 'stream_user_transaction_amount_averages.amount_sum_1h_continuous': 100.0, 'stream_user_transaction_amount_averages.amount_sum_30d_continuous': 100.0}

tip

The .ingest() method makes it easy to push events from a notebook. In production we recommend pushing events directly to the HTTP endpoint for the best performance.

The same goes for reading online data from a Feature Service via .get_online_features(). For best performance we recommend reading directly from the HTTP API or using our Python Client Library.

โญ๏ธ Conclusionโ€‹

There you have it! Using nothing but Python and a local dev environment we were able to get real-time features running online and ready to consume by a production model.

Next up, check out how Tecton lets you build powerful features that can only be calculated at the time of a prediction in the Building Realtime Features tutorial.

Was this page helpful?

๐Ÿง  Hi! Ask me anything about Tecton!

Floating button icon