Skip to main content
Version: 1.1

Test Stream Features

Import libraries and select your workspaceโ€‹

import tecton
import pandas
from datetime import datetime

ws = tecton.get_workspace("prod")

Load a Stream Feature Viewโ€‹

fv = ws.get_feature_view("last_transaction_amount_sql")
fv.summary()

Test Real-time Streaming Featuresโ€‹

Testing Rift Streaming Featuresโ€‹

For Rift streaming features, you can test real-time ingestion and feature computation using the Stream Ingest API. This approach allows you to send test events and immediately verify that features are computed correctly.

note

The .ingest() method examples below require that the StreamSource has been applied to a live Tecton workspace using tecton apply. The method will not work on locally defined StreamSource objects.

Create a Stream Source with PushConfigโ€‹

First, define a StreamSource that uses PushConfig for the Stream Ingest API:

from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64

transactions_stream = StreamSource(
name="transactions_stream",
# Configure real-time ingestion via HTTP API
stream_config=PushConfig(),
# Historical data for testing and backfills
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)

Ingest Test Eventsโ€‹

After applying your StreamSource to a workspace with tecton apply, you can use the .ingest() method to send test events:

# Get the applied StreamSource from your workspace
import tecton

ws = tecton.get_workspace("your_workspace_name")
transactions_stream = ws.get_data_source("transactions_stream")

# Ingest a single test event
response = transactions_stream.ingest({"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 50.75})

print("Ingestion response:", response)

You can also ingest multiple events at once:

# Ingest multiple test events
test_events = [
{"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 25.50},
{"user_id": "test_user_456", "timestamp": datetime.utcnow(), "amount": 100.00},
]

response = transactions_stream.ingest(test_events)
print("Batch ingestion response:")
print(response)

Verify Feature Updatesโ€‹

After ingesting events, you can immediately check that features were updated in the online store:

# Get the updated features for the test user
fv = ws.get_feature_view("your_stream_feature_view_name")
features = fv.get_online_features({"user_id": "test_user_123"}).to_dict()
print("Updated feature values:")
print(features)

Using Dry Run for Testingโ€‹

You can validate your events without actually writing to the feature store:

# Test event validation without writing to the store
response = transactions_stream.ingest(
{"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 75.25}, dry_run=True
)
print("Dry run validation result:")
print(response)

Testing Spark Streaming Featuresโ€‹

note

This section only applies to Spark streaming features. These methods must be run on a Spark cluster.

The run_stream method will start a Spark Structured Streaming job and write the results to the specified temporary table.

fv.run_stream(output_temp_table="output_temp_table")

The temporary table can then be queried to view real-time results. Run this code in a separate notebook cell.

# Query the result from the streaming output table.
display(spark.sql("SELECT * FROM output_temp_table ORDER BY timestamp DESC LIMIT 5"))
user_idtimestampamt
0user_4699984415712022-06-07 18:31:2454.46
1user_4608779617872022-06-07 18:31:2173.02
2user_6503879770762022-06-07 18:31:2046.05
3user_6996681258182022-06-07 18:31:1759.24
4user_3944957590232022-06-07 18:31:1511.38

Get a Range of Feature Values from Offline Feature Storeโ€‹

from_source=True can be specified to bypass the offline store and compute features on-the-fly against the raw data source. This is useful for testing the expected output of feature values.

Use from_source=False to see what data is materialized in the offline store.

result_dataframe = fv.get_features_in_range(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), from_source=True
).to_pandas()
display(result_dataframe)
user_idamt_valid_from_valid_to
0user_176.452022-05-01 01:50:512022-05-02 00:00:00
1user_245.82022-05-01 02:05:392022-05-02 03:51:28
2user_21.432022-05-01 03:51:282022-05-02 00:00:00
3user_352.312022-05-01 02:41:422022-05-02 00:00:00
4user_464.152022-05-01 04:48:272022-05-02 00:00:00

Read the Latest Features from Online Feature Storeโ€‹

fv.get_online_features({"user_id": "user_3"}).to_dict()
Out: {"amt": 180.6}

Read Historical Features from Offline Feature Store with Time-Travelโ€‹

Create a events DataFrame with events to look up. For more information on the events dataframe, check out Selecting Sample Keys and Timestamps.

events = pandas.DataFrame(
{
"user_id": ["user_3", "user_5"],
"timestamp": [datetime(2022, 5, 1, 19), datetime(2022, 5, 6, 10)],
}
)
display(events)
user_idtimestamp
0user_32022-05-01 19:00:00
1user_52022-05-06 10:00:00

from_source=True can be specified to bypass the offline store and compute features on-the-fly against the raw data source. However, this will be slower than reading feature data that has been materialized to the offline store.

features_df = fv.get_features_for_events(events, from_source=True).to_pandas()
display(features_df)
user_idtimestamplast_transaction_amount_sql__amt
0user_32022-05-01 19:00:0052.31
1user_52022-05-06 10:00:0058.68

Was this page helpful?