Test Stream Features
Import libraries and select your workspaceโ
import tecton
import pandas
from datetime import datetime
ws = tecton.get_workspace("prod")
Load a Stream Feature Viewโ
fv = ws.get_feature_view("last_transaction_amount_sql")
fv.summary()
Test Real-time Streaming Featuresโ
Testing Rift Streaming Featuresโ
For Rift streaming features, you can test real-time ingestion and feature computation using the Stream Ingest API. This approach allows you to send test events and immediately verify that features are computed correctly.
The .ingest() method examples below require that the StreamSource has been
applied to a live Tecton workspace using tecton apply. The method will not
work on locally defined StreamSource objects.
Create a Stream Source with PushConfigโ
First, define a StreamSource that uses PushConfig for the Stream Ingest API:
from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64
transactions_stream = StreamSource(
name="transactions_stream",
# Configure real-time ingestion via HTTP API
stream_config=PushConfig(),
# Historical data for testing and backfills
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amount", Float64)],
)
Ingest Test Eventsโ
After applying your StreamSource to a workspace with tecton apply, you can use
the .ingest() method to send test events:
# Get the applied StreamSource from your workspace
import tecton
ws = tecton.get_workspace("your_workspace_name")
transactions_stream = ws.get_data_source("transactions_stream")
# Ingest a single test event
response = transactions_stream.ingest({"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 50.75})
print("Ingestion response:", response)
You can also ingest multiple events at once:
# Ingest multiple test events
test_events = [
{"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 25.50},
{"user_id": "test_user_456", "timestamp": datetime.utcnow(), "amount": 100.00},
]
response = transactions_stream.ingest(test_events)
print("Batch ingestion response:")
print(response)
Verify Feature Updatesโ
After ingesting events, you can immediately check that features were updated in the online store:
# Get the updated features for the test user
fv = ws.get_feature_view("your_stream_feature_view_name")
features = fv.get_online_features({"user_id": "test_user_123"}).to_dict()
print("Updated feature values:")
print(features)
Using Dry Run for Testingโ
You can validate your events without actually writing to the feature store:
# Test event validation without writing to the store
response = transactions_stream.ingest(
{"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 75.25}, dry_run=True
)
print("Dry run validation result:")
print(response)
Testing Spark Streaming Featuresโ
This section only applies to Spark streaming features. These methods must be run on a Spark cluster.
The run_stream method will start a
Spark Structured Streaming
job and write the results to the specified temporary table.
fv.run_stream(output_temp_table="output_temp_table")
The temporary table can then be queried to view real-time results. Run this code in a separate notebook cell.
# Query the result from the streaming output table.
display(spark.sql("SELECT * FROM output_temp_table ORDER BY timestamp DESC LIMIT 5"))
| user_id | timestamp | amt | |
|---|---|---|---|
| 0 | user_469998441571 | 2022-06-07 18:31:24 | 54.46 |
| 1 | user_460877961787 | 2022-06-07 18:31:21 | 73.02 |
| 2 | user_650387977076 | 2022-06-07 18:31:20 | 46.05 |
| 3 | user_699668125818 | 2022-06-07 18:31:17 | 59.24 |
| 4 | user_394495759023 | 2022-06-07 18:31:15 | 11.38 |
Get a Range of Feature Values from Offline Feature Storeโ
from_source=True can be specified to bypass the offline store and compute
features on-the-fly against the raw data source. This is useful for testing the
expected output of feature values.
Use from_source=False to see what data is materialized in the offline store.
result_dataframe = fv.get_features_in_range(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), from_source=True
).to_pandas()
display(result_dataframe)
| user_id | amt | _valid_from | _valid_to | |
|---|---|---|---|---|
| 0 | user_1 | 76.45 | 2022-05-01 01:50:51 | 2022-05-02 00:00:00 |
| 1 | user_2 | 45.8 | 2022-05-01 02:05:39 | 2022-05-02 03:51:28 |
| 2 | user_2 | 1.43 | 2022-05-01 03:51:28 | 2022-05-02 00:00:00 |
| 3 | user_3 | 52.31 | 2022-05-01 02:41:42 | 2022-05-02 00:00:00 |
| 4 | user_4 | 64.15 | 2022-05-01 04:48:27 | 2022-05-02 00:00:00 |
Read the Latest Features from Online Feature Storeโ
fv.get_online_features({"user_id": "user_3"}).to_dict()
Out: {"amt": 180.6}
Read Historical Features from Offline Feature Store with Time-Travelโ
Create a events DataFrame with events to look up. For more information on the
events dataframe, check out
Selecting Sample Keys and Timestamps.
events = pandas.DataFrame(
{
"user_id": ["user_3", "user_5"],
"timestamp": [datetime(2022, 5, 1, 19), datetime(2022, 5, 6, 10)],
}
)
display(events)
| user_id | timestamp | |
|---|---|---|
| 0 | user_3 | 2022-05-01 19:00:00 |
| 1 | user_5 | 2022-05-06 10:00:00 |
from_source=True can be specified to bypass the offline store and compute
features on-the-fly against the raw data source. However, this will be slower
than reading feature data that has been materialized to the offline store.
features_df = fv.get_features_for_events(events, from_source=True).to_pandas()
display(features_df)
| user_id | timestamp | last_transaction_amount_sql__amt | |
|---|---|---|---|
| 0 | user_3 | 2022-05-01 19:00:00 | 52.31 |
| 1 | user_5 | 2022-05-06 10:00:00 | 58.68 |