Skip to main content
Version: 1.1

Building Streaming Features with Tecton

Open In Colab

Want to make your ML models react to data as it happens? Streaming features let you capture and respond to events in real-time, but building them traditionally requires complex streaming infrastructure and careful coordination between training and serving.

In this tutorial, you'll build a streaming feature pipeline that processes transaction data in real-time - perfect for fraud detection, user behavior analysis, or any application that needs to respond quickly to new data. You'll do it all using just Python, and we'll handle the streaming infrastructure for you.

What You'll Build

You'll create a streaming pipeline that:

  • Ingests transaction events in real-time via HTTP
  • Computes running transaction totals over multiple time windows (1 min, 1 hour, 30 days)
  • Serves feature values with millisecond latency
  • Maintains consistency between training and serving

What You'll Learn

  • How to create streaming features using Python
  • How to test features interactively in your notebook
  • How to generate training data from streaming features
  • How to serve features in production

Time to Complete: 15-20 minutes

No prior Tecton experience needed - if you're comfortable with Python, you can complete this tutorial. Let's get started by setting up your environment!

Prerequisites

Before we dive into building features, let's get your environment set up. You'll need Python >= 3.8 to get started.

1. Install Required Libraries

Run this command to install the Tecton SDK and supporting libraries:

!pip install 'tecton[rift]==1.0.0' gcsfs s3fs -q

2. Connect to Tecton

Log in to your Tecton account (replace explore.tecton.ai with your organization's URL if different):

import tecton

tecton.login("explore.tecton.ai")

When you run this command, you'll be prompted to:

  1. Open a browser window
  2. Click a button to generate an authentication token
  3. Copy the token back into your notebook
  4. Press Enter to continue

3. Set Up Python Imports

from tecton import *
from tecton.types import *
from datetime import datetime, timedelta
import pandas as pd

# Configure Tecton to use Rift for compute
tecton.conf.set("TECTON_OFFLINE_RETRIEVAL_COMPUTE_MODE", "rift")
Not yet a Tecton user? Sign up at

tecton.ai/explore for a free account to try this tutorial.

That's it for setup! Your environment is ready for building streaming features. Let's create your first streaming pipeline.

Part 1: Your First Streaming Feature

Imagine we've been tasked with preventing credit card fraud for an ecommerce business. To do this, we'll need to monitor a stream of transactions to look for unusual buying patterns from our customers.

Let's start by building a streaming pipeline that can process real-time transaction data. We'll start simple, but quickly build up to features that can capture spending patterns across multiple time windows.

First, let's define our stream source to specify where the data will come from:

from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64

transactions_stream = StreamSource(
name="transactions_stream",
# Configure real-time ingestion via HTTP API
stream_config=PushConfig(),
# Historical data for testing and backfills
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)

Let's pull some sample data to see what we're working with:

start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)

df = transactions_stream.get_dataframe(start, end).to_pandas()
print("Sample transaction data:")
display(df.head(5))

You'll see transaction data like this:

indextimestampuser_idamount
02023-05-01 00:38:11.716917user_556006905050.77
12023-05-01 00:45:03.031664user_827712133795.36
22023-05-01 00:53:40.791126user_440971840712.02
32023-05-01 01:18:38.115718user_6606710651866.82
42023-05-01 01:56:28.194464user_12008385550.04

Now let's create our first streaming feature! We'll calculate transaction totals over three different time windows:

from tecton import Entity, stream_feature_view, Aggregate
from tecton.types import Field, String, Float64
from datetime import datetime, timedelta

user = Entity(name="user", join_keys=[Field("user_id", String)])


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
# Track spend in the last minute
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
# Track spend in the last hour
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
# Track spend in the last 30 days
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]

Let's test our feature with some historical data:

start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)

df = user_transaction_amount_totals.get_features_in_range(start_time=start, end_time=end).to_pandas()

print("Sample feature values:")
display(df.head(5))

You'll see output like this:

indexuser_idamount_sum_1m_continuousamount_sum_1h_continuousamount_sum_30d_continuous
0user_34008825950.011.6534.42
1user_44112996290.0259.5259.5
2user_90316521130.041.2241.22
3user_11171459480.00.05.08
4user_11208296590.00.0125.65

Great! You've just created your first streaming feature pipeline. Let's break down what we built:

  1. A StreamSource that can accept real-time data via HTTP
  2. A user entity to identify who each transaction belongs to
  3. A feature that calculates running transaction totals over three windows:
    • 1 minute window for immediate spending
    • 1 hour window for recent patterns
    • 30 day window for long-term patterns
  4. Tested everything using historical data

In the next section, we'll make this real-time by adding more powerful features and seeing how they update instantly when new data arrives!

Part 2: Making Features More Powerful

Now that we've built our basic streaming feature, let's see how we can use it to generate training data for our models. One of the powerful aspects of Tecton's streaming features is that we can use historical data to generate point-in-time correct training datasets, ensuring our models learn from accurate historical patterns.

First, let's create a Feature Service to organize our features:

from tecton import FeatureService

fraud_detection_feature_service_streaming = FeatureService(
name="fraud_detection_feature_service_streaming", features=[user_transaction_amount_totals]
)

Now let's generate some training data that combines historical transactions with fraud labels:

# Retrieve our dataset of historical transaction data
transactions_df = pd.read_parquet("s3://tecton.ai.public/tutorials/transactions.pq", storage_options={"anon": True})

# Load our fraud labels dataset
training_labels = pd.read_parquet("s3://tecton.ai.public/tutorials/labels.pq", storage_options={"anon": True})

# Join our labels to transactions to create training events
training_events = training_labels.merge(transactions_df, on=["transaction_id"], how="left")[
["user_id", "timestamp", "amount", "is_fraud"]
]

# Generate point-in-time correct training data
training_data = fraud_detection_feature_service_streaming.get_features_for_events(training_events).to_pandas().fillna(0)

print("Sample training data with streaming features:")
display(training_data.sample(5))

You'll see output like this:

indexuser_idtimestampis_fraudamountamount_sum_30d_continuousamount_sum_1m_continuousamount_sum_1h_continuous
0user_10287476362021-01-03 08:42:43.668406077.0977.0977.0977.09
1user_11559401572021-01-21 03:27:42.566411043.0143.0143.0143.01
2user_15677086462021-01-20 13:57:14.8326150536.1536.1536.1536.1
3user_15677086462021-01-21 18:13:41.5350670608.2672.1672.1672.16
4user_17553850632021-01-05 04:19:08.782106096.8496.8496.8496.84

What's powerful about this?

  1. Point-in-time Correctness: The features we get for training data only use information that would have been available at the time of each transaction. This prevents data leakage.

  2. Consistent Features: The same feature calculations we use in training will be used in production - no unexpected differences between training and serving.

  3. Fresh Data: In production, these features will update in real-time as new transactions come in.

Ready to see these features update in real-time? Let's move on to Part 3 where we'll productionize our pipeline and send in live data!

Part 3: Going to Production

Now for the fun part - let's see these features update in real-time! We'll set up our service account, send in some live transactions, and watch the features update instantly.

First, let's set up our connection using a Tecton API key:

# Replace with your API key from https://explore.tecton.ai/app/settings/accounts-and-access/service-accounts
TECTON_API_KEY = "your-api-key" # replace with your API key
WORKSPACE_NAME = "prod" # replace with your new workspace name if needed

tecton.set_credentials(tecton_api_key=TECTON_API_KEY)

# Get our data source and feature service
ws = tecton.get_workspace(WORKSPACE_NAME)
transactions_stream_source = ws.get_data_source("transactions_stream")
fraud_detection_feature_service_streaming = ws.get_feature_service("fraud_detection_feature_service_streaming")

# Generate a test user ID
user_id = "".join(random.choices(string.ascii_letters + string.digits, k=10))
print("Generated test user ID:", user_id)

Now let's send in a transaction and watch our features update in real-time:

# Send in a new transaction
response = transactions_stream_source.ingest({"user_id": user_id, "timestamp": datetime.utcnow(), "amount": 100.00})
print("Ingested transaction:")
print(response)

# Immediately fetch the updated features
features = fraud_detection_feature_service_streaming.get_online_features(join_keys={"user_id": user_id}).to_dict()
print("\nUpdated feature values:")
print(features)

You'll see output like this:

Generated test user ID: xK9nPq2Rzm

Ingested transaction:
{'workspaceName': 'prod', 'ingestMetrics': {'featureViewIngestMetrics': [{'featureViewName': 'user_transaction_amount_totals', 'onlineRecordIngestCount': '1'}]}}

Updated feature values:
{
'amount_sum_1h_continuous': 100.0,
'amount_sum_1m_continuous': 100.0,
'amount_sum_30d_continuous': 100.0
}

Try running the ingest code multiple times - you'll notice:

  • The 1-minute window updates quickly
  • The 1-hour window accumulates transactions
  • The 30-day window keeps a long-term total
Production Best Practices
  • For best performance, use the HTTP API directly instead of .ingest()
  • Monitor your feature freshness in the Tecton UI
  • Set up alerts for serving latency and feature distribution changes

Wrap-up

Congratulations! You've built a complete streaming feature pipeline that can:

  • Ingest real-time transaction data
  • Calculate features over multiple time windows (1 minute, 1 hour, 30 days)
  • Generate training data that's point-in-time correct
  • Serve feature values with sub-second latency

What We Built

  1. A streaming data source that accepts real-time events
  2. A feature view that computes running totals over three time windows:
    • 1 minute (for very recent activity)
    • 1 hour (for recent patterns)
    • 30 days (for long-term patterns)
  3. A feature service that can serve these values in real-time and generate training data

Key Concepts You've Learned

  • How to define a StreamSource for real-time data
  • How to create time-windowed aggregation features
  • How to test features with historical data
  • How to send and receive real-time updates

Next Steps

  1. Explore More Features

    • Try different aggregation functions (mean, count, max)
    • Add more time windows
    • Combine multiple features together
  2. Ready for Production? Try:

    • Setting up monitoring
    • Configuring alerts
    • Adding feature validation
    • Implementing retry logic for ingestion

Want to use Tecton with your own data? Sign up for a free trial at tecton.ai/free-trial to get started!

Was this page helpful?