Skip to main content
Version: 1.2

Unit Testing for Rift Feature Views

Example: Test Rift Batch Feature Views with Snowflake​

note

To use Snowflake with Tecton in the unit tests below, you'll need to install the Snowflake connector: pip install snowflake-connector-python. For more information, see the snowflake-connector-python package.

Here's an example of configuring SnowflakeContext for batch feature views:

@pytest.fixture(scope="module", autouse=True)
def configure_snowflake_session():
# connection_parameters assumes the Snowflake connection credentials are stored in the environment
# variables `SNOWFLAKE_USER`,`SNOWFLAKE_PASSWORD` and `SNOWFLAKE_ACCOUNT`.
connection_parameters = {
"user": os.environ["SNOWFLAKE_USER"], # Your username in the Snowflake account that you're using with Tecton
"password": os.environ[
"SNOWFLAKE_PASSWORD"
], # Your password in the Snowflake account that you're using with Tecton. Not needed if using the authenticator parameter above.
"account": os.environ[
"SNOWFLAKE_ACCOUNT"
], # The Snowflake account you're using with Tecton (takes the form \<SNOWFLAKE_ACCOUNT\>.snowflakecomputing.com
"warehouse": "TECTON_WAREHOUSE", # The Snowflake warehouse you're using to run unit test query
# Database and schema are required to create various temporary objects by tecton
"database": "TECTON",
"schema": "PUBLIC",
}
# Tecton will use these Snowflake connection parameters for all interactive queries
tecton.snowflake_context.set_connection(connection_parameters)


def test_user_credit_card_issuer_ghf():
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)

events_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)

# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_features_for_events(
events_df, mock_inputs={"fraud_users_batch": input_pandas_df}
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)

# NOTE: It is important to sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)

pandas.testing.assert_frame_equal(actual, expected)

Example: Test Rift Stream Feature Views​

Rift Stream Feature Views can also be unit tested using mock_inputs. Stream Feature Views in Rift work with StreamSources that have a PushConfig.

Here's an example of a Stream Feature View with aggregations and its unit test:

from tecton import StreamSource, PushConfig, FileConfig, stream_feature_view, Entity, Aggregate
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta

# Define the stream source
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/transactions.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("amount", Float64),
],
)

# Define the entity
user = Entity(name="user", join_keys=[Field("user_id", String)])

# Define the stream feature view with aggregations
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("amount", Float64), function="sum", time_window=timedelta(days=30)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction amount totals over different time windows",
)
def user_transaction_amount_totals(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]

Unit test for the aggregation Stream Feature View:

import pandas
from datetime import datetime, timedelta
from fraud.features.stream_features.user_transaction_amount_totals import user_transaction_amount_totals


def test_user_transaction_amount_totals():
# Create mock input data for the stream source
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_2", "user_1"],
"timestamp": [
datetime(2022, 5, 1, 10, 0, 0),
datetime(2022, 5, 1, 10, 30, 0),
datetime(2022, 5, 1, 11, 0, 0),
datetime(2022, 5, 1, 11, 15, 0),
datetime(2022, 5, 1, 12, 0, 0),
],
"amount": [100.0, 50.0, 200.0, 75.0, 25.0],
}
)

events_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2"],
"timestamp": [datetime(2022, 5, 1, 12, 0, 0), datetime(2022, 5, 1, 12, 0, 0)],
}
)

# Test the stream feature view with mock inputs
output = user_transaction_amount_totals.get_features_for_events(
events_df, mock_inputs={"transactions_stream": input_pandas_df}
)

actual = output.to_pandas()

# Expected aggregated values based on the time windows
expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2"],
"timestamp": [datetime(2022, 5, 1, 12, 0, 0), datetime(2022, 5, 1, 12, 0, 0)],
"user_transaction_amount_totals__amount_sum_1m": [25.0, 0.0], # Only last transaction in 1-minute window
"user_transaction_amount_totals__amount_sum_1h": [75.0, 75.0], # Transactions in last hour
"user_transaction_amount_totals__amount_sum_30d": [175.0, 275.0], # All transactions in 30-day window
}
)

# Sort to ensure consistent ordering
actual = actual.sort_values(["user_id"]).reset_index(drop=True)
expected = expected.sort_values(["user_id"]).reset_index(drop=True)

pandas.testing.assert_frame_equal(actual, expected)

Here's an example of a simpler Stream Feature View without aggregations and its unit test:

from tecton import stream_feature_view, Attribute
from tecton.types import Float64
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pandas",
timestamp_field="timestamp",
features=[Attribute("amount", Float64)],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
ttl=timedelta(days=30),
description="Last transaction amount for a user",
)
def last_transaction_amount(transactions_stream):
return transactions_stream[["user_id", "timestamp", "amount"]]

Unit test for the simple Stream Feature View:

import pandas
from datetime import datetime
from fraud.features.stream_features.last_transaction_amount import last_transaction_amount


def test_last_transaction_amount():
# Create mock input data for the stream source
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [
datetime(2022, 5, 1, 10, 0, 0),
datetime(2022, 5, 1, 11, 0, 0),
datetime(2022, 5, 1, 12, 0, 0),
],
"amount": [100.0, 200.0, 50.0],
}
)

events_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [
datetime(2022, 5, 1, 10, 0, 0),
datetime(2022, 5, 1, 11, 0, 0),
datetime(2022, 5, 1, 12, 0, 0),
],
}
)

# Test the stream feature view with mock inputs
output = last_transaction_amount.get_features_for_events(
events_df, mock_inputs={"transactions_stream": input_pandas_df}
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [
datetime(2022, 5, 1, 10, 0, 0),
datetime(2022, 5, 1, 11, 0, 0),
datetime(2022, 5, 1, 12, 0, 0),
],
"last_transaction_amount__amount": [100.0, 200.0, 50.0],
}
)

pandas.testing.assert_frame_equal(actual, expected)

Was this page helpful?