Skip to main content
Version: Beta ๐Ÿšง

Testing Stream Features

This guide covers how to test stream features in Tecton using offline retrieval methods with mock data and real-time ingestion testing.

Overviewโ€‹

Testing stream features involves validating both the streaming transformation logic and the resulting feature values. Stream features can be tested using offline methods with mock data, or through real-time ingestion for end-to-end validation.

Testing Methodsโ€‹

Stream features support multiple testing approaches:

1. Testing Final Feature Values (Offline)โ€‹

Stream features support the same offline testing methods as batch features, providing a consistent testing interface across feature types. This allows you to test streaming transformation logic without needing live streaming infrastructure.

Time Range Testingโ€‹

Use get_features_in_range() to test stream feature values over a time range with from_source=True to compute from raw data:

# Test stream feature values over a time range
result_df = stream_fv.get_features_in_range(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
from_source=True, # Compute from raw data
mock_inputs={"stream_source": mock_data},
)
Result
user_idamt_valid_from_valid_to
user_176.452022-05-01 01:50:512022-05-02 00:00:00
user_245.82022-05-01 02:05:392022-05-02 03:51:28
user_21.432022-05-01 03:51:282022-05-02 00:00:00

Point-in-Time Testingโ€‹

Use get_features_for_events() to test stream features for specific events:

# Test stream features for specific events
events_df = pd.DataFrame(
{"user_id": ["user_3", "user_5"], "timestamp": [datetime(2022, 5, 1, 19), datetime(2022, 5, 6, 10)]}
)

result_df = stream_fv.get_features_for_events(
events=events_df, from_source=True, mock_inputs={"stream_source": mock_data}
)
Result
user_idtimestamplast_transaction_amount_sql__amt
user_32022-05-01 19:00:0052.31
user_52022-05-06 10:00:0058.68

2. Testing Real-time Streaming (Rift)โ€‹

For Rift streaming features, test real-time ingestion and feature computation using the Stream Ingest API:

note

The .ingest() method runs the stream transformation and writes the results to the online store. This requires that the StreamSource has been applied to a live Tecton workspace using tecton apply. The method will not work on locally defined StreamSource objects.

# Get the applied StreamSource from your workspace
ws = tecton.get_workspace("your_workspace_name")
stream_source = ws.get_data_source("transactions_stream")

# Ingest test events
test_events = [
{"user_id": "test_user_123", "timestamp": datetime.utcnow(), "amount": 25.50},
{"user_id": "test_user_456", "timestamp": datetime.utcnow(), "amount": 100.00},
]

response = stream_source.ingest(test_events)
print("Ingestion response:", response)

# Verify feature updates
fv = ws.get_feature_view("your_stream_feature_view_name")
features = fv.get_online_features({"user_id": "test_user_123"}).to_dict()
print("Updated feature values:", features)

# Optional: Validate events without writing to store using dry run
response = stream_source.ingest({"user_id": "test_user", "amount": 75.25}, dry_run=True)
print("Validation result:", response)

3. Testing Spark Streaming Featuresโ€‹

The run_stream() method will start a Spark Structured Streaming job and write the results to the specified temporary table:

# Start streaming job (must be run on Spark cluster)
fv.run_stream(output_temp_table="output_temp_table")

The temporary table can then be queried to view real-time results. Run this code in a separate notebook cell:

# Query the result from the streaming output table
display(spark.sql("SELECT * FROM output_temp_table ORDER BY timestamp DESC LIMIT 5"))
Example Output
user_idtimestampamt
user_12022-06-07 18:31:2454.46
user_22022-06-07 18:31:2173.02
user_32022-06-07 18:31:2046.05
user_42022-06-07 18:31:1759.24
user_52022-06-07 18:31:1511.38

4. Testing Online Feature Storeโ€‹

Do not use get_online_features() to read features in production.

This method is intended for testing and does not have production level performance. To read features online efficiently in production, see Reading Features for Inference.

Test the latest feature values from the online feature store using get_online_features():

# Test reading the latest features from the online store
online_result = fv.get_online_features({"user_id": "user_3"}).to_dict()
print(online_result)
Result
{"amt": 180.6}

Mock Data Guidelinesโ€‹

Stream Data Requirementsโ€‹

  • Include all columns used in stream transformations
  • Provide realistic event timestamps and ordering
  • Include sufficient data volume for streaming window testing
  • Consider event arrival patterns (batching, delays)

Best Practicesโ€‹

  • Test with realistic event rates and patterns
  • Include out-of-order events if applicable
  • Test with duplicate events and handle deduplication

Example: Testing Stream Ingestion End-to-Endโ€‹

This example demonstrates testing stream features with both offline testing and real-time ingestion validation:

import tecton
import pandas as pd
from datetime import datetime

ws = tecton.get_workspace("prod")
stream_fv = ws.get_feature_view("last_transaction_amount")

# Mock streaming data for offline testing
mock_stream_data = pd.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"] * 3,
"timestamp": [
datetime(2022, 5, 1, 1, 50, 51),
datetime(2022, 5, 1, 2, 5, 39),
datetime(2022, 5, 1, 3, 51, 28),
datetime(2022, 5, 1, 15, 20, 15),
datetime(2022, 5, 1, 16, 35, 42),
datetime(2022, 5, 1, 17, 45, 33),
datetime(2022, 5, 2, 9, 10, 22),
datetime(2022, 5, 2, 10, 25, 18),
datetime(2022, 5, 2, 11, 40, 55),
],
"amt": [76.45, 45.8, 1.43, 89.2, 156.7, 23.1, 245.8, 67.3, 112.5],
}
)

# 1. Test offline consistency with batch-like interface
offline_result = stream_fv.get_features_in_range(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
from_source=True,
mock_inputs={"stream_source": mock_stream_data},
)
print("Offline stream testing:")
display(offline_result.to_pandas())

# 2. Test real-time ingestion (requires applied StreamSource)
stream_source = ws.get_data_source("transactions_stream")

# Ingest test events for real-time validation
test_events = [
{"user_id": "test_user_1", "timestamp": datetime.utcnow(), "amount": 25.50},
{"user_id": "test_user_2", "timestamp": datetime.utcnow(), "amount": 100.00},
]

response = stream_source.ingest(test_events)
print("Ingestion response:", response)

# Verify real-time feature updates
import time

time.sleep(2) # Allow processing time

features = stream_fv.get_online_features({"user_id": "test_user_1"}).to_dict()
print("Real-time feature values:", features)

This example demonstrates:

  • Offline Testing: Using the same interface as batch features for consistency
  • Real-time Validation: Testing actual streaming ingestion and feature computation
  • End-to-end Flow: From raw stream events to computed feature values

Was this page helpful?