Skip to main content
Version: 0.9

Unit Tests

note

If you are using Spark and you will be running unit tests, the Tecton CLI must be installed using one of the following commands:

  • To install with Pyspark 3.1: pip install 'tecton[pyspark]'
  • To install with Pyspark 3.2: pip install 'tecton[pyspark3.2]'
  • To install with Pyspark 3.3: pip install tecton pyspark==3.3

If you are using Rift, you must install the Tecton CLI with the following commands:

pip install 'tecton[rift]'

Overview​

Tecton supports running local unit tests. Unit tests can be defined in feature repositories in file paths matching the pattern **/tests/*.py.

Tests can be run when the following commands are executed:

  • tecton apply: Runs the tests and applies the repo if the tests pass.

  • tecton plan: Runs the tests and shows the changes that would be made to the repo if the changes were applied.

  • tecton test: Runs the tests, only.

Test Pandas or Python-Based Feature Views​

Testing a Feature View with Pandas or Python transformations is straightforward All we need is the Feature View and a test file located in the tests/ directory.

For example, consider a Feature View that determines if a transaction amount is high:

from tecton import RequestSource, on_demand_feature_view
from tecton.types import Field, Float64, Int64
import pandas

# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])

# Define the output schema
output_schema = [Field("transaction_amount_is_high", Int64)]

# This On-Demand Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@on_demand_feature_view(
sources=[transaction_request],
mode="pandas",
schema=output_schema,
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request: pandas.DataFrame):
import pandas as pd

df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df

With the above Feature View, we can define the unit test that mocks up some sample inputs and asserts that we're getting the expected result.

### tests/transaction_amount_is_high.py ###
from fraud.features.on_demand_feature_views.transaction_amount_is_high import transaction_amount_is_high
import pandas

# Testing the 'transaction_amount_is_high' feature which depends on request data ('amount') as input
def test_transaction_amount_is_high():
transaction_request = pandas.DataFrame({"amount": [124, 10001, 34235436234]})

actual = transaction_amount_is_high.run_transformation(transaction_request=transaction_request)
expected = pandas.DataFrame({"transaction_amount_is_high": [0, 1, 1]})

pandas.testing.assert_frame_equal(actual, expected)

Test Spark-based Feature Views​

Creating a unit test in a PySpark or Spark SQL feature view is similar to the above example, except that we also need to install the Java Development Kit (JDK) locally and provide a SparkSession in the test code.

tecton test will not connect to Tecton's backend APIs so it can be run in a hermetic environment. Tecton ensures that any tested feature transformation code is serialized and deserialized to ensure that executed code is representative of code that will be executed in materialization.

Installing the JDK​

Unit tests for a PySpark or Spark SQL Feature View require JDK version 8 (u201 or later) or JDK version 11 to run.

caution

Older versions of the JDK are provided by Oracle to help developers debug issues in older systems. They are not updated with the latest security patches and are not recommended for use in production.

Unit tests using a PySpark or Spark SQL Feature View are not supported in environments that have Databricks Connect installed.

Install the JDK and then set the JAVA_HOME environment variable.

Supported Methods​

In unit tests, the following methods are supported:

  • On-Demand Feature Views: test_run
  • Batch and Stream Feature Views: test_run, run_transformation, get_features_in_range, and get_features_for_events.

Examples​

For example, consider a Feature View that determines if a user has good credit:

### user_has_good_credit.py ###
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.credit_scores_batch import credit_scores_batch
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(source=credit_scores_batch)],
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2021, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=120),
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
credit_score > 670 as user_has_good_credit,
timestamp
FROM
{credit_scores}
"""

Because this is a Spark SQL Feature View, we'll need a SparkSession to test. Tecton provides the tecton_pytest_spark_session pytest fixture. This fixture creates a SparkSession.

Finally, we can define the actual unit test that mocks up some sample inputs, and asserts that we're getting the expected result.

You should ensure that the mock data schema exactly matches the source schema. Any datetime partition columns that may be present need to match, too.

Use .run() in a unit test​

from datetime import datetime, timedelta

import pandas

from fraud.features.batch_feature_views.user_has_good_credit import user_has_good_credit


def test_user_credit_card_issuer(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"signup_timestamp": [datetime(2022, 5, 1)] * 3,
"credit_score": [600, 670, 700],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

# Simulate materializing features for May 1st.
output = user_has_good_credit.test_run(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), credit_scores=input_spark_df
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1)] * 3,
"user_has_good_credit": [False, False, True],
}
)

pandas.testing.assert_frame_equal(actual, expected)

Just like in the example above, this test will now run when we execute tecton apply, tecton plan, or tecton test.

Use get_features_for_events() in a unit test​

note

The ability to run get_features_for_events (Previously get_historical_features) in a unit test was introduced in Tecton SDK 0.7 and does not work in prior versions.

def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

events = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)

# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_features_for_events(events, mock_inputs={"fraud_users_batch": input_spark_df})

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)

# NOTE: because the Spark join has non-deterministic ordering, it is important to
# sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)

pandas.testing.assert_frame_equal(actual, expected)

Configure the Local Test Spark Session​

Tecton provides a Pytest session-scoped tecton_pytest_spark_session fixture. However, that Spark session may not be configured correctly for your tests. In that case, you may either configure the Tecton-provided fixture or create your own Spark session.

Here's an example of configuring the Tecton-provided Spark session:

import pytest


@pytest.fixture(scope="module", autouse=True)
def configure_spark_session(tecton_pytest_spark_session):
# Custom configuration for the spark session.
tecton_pytest_spark_session.conf.set("spark.sql.session.timeZone", "UTC")

Here's an example of how to create your own Spark session and provide it to Tecton:

from importlib import resources


@pytest.fixture(scope="session")
def my_custom_spark_session():
"""Returns a custom spark session configured for use in Tecton unit testing."""
with resources.path("tecton_spark.jars", "tecton-udfs-spark-3.jar") as path:
tecton_udf_jar_path = str(path)

spark = (
SparkSession.builder.appName("my_custom_spark_session")
.config("spark.jars", tecton_udf_jar_path)
# This short-circuit's Spark's attempt to auto-detect a hostname for the master address, which can lead to
# errors on hosts with "unusual" hostnames that Spark believes are invalid.
.config("spark.driver.host", "localhost")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
try:
tecton.set_tecton_spark_session(spark)
yield spark
finally:
spark.stop()

Test Snowflake-based Feature Views (in Rift)​

When using Rift and unit testing a Feature View with a Snowflake Data Source or mode=snowflake_sql, a SnowflakeContext should be provided.

Here's an example of configuring SnowflakeContext:

@pytest.fixture(scope="module", autouse=True)
def configure_snowflake_session():
# connection_parameters assumes the Snowflake connection credentials are stored in the environment
# variables `SNOWFLAKE_USER`,`SNOWFLAKE_PASSWORD` and `SNOWFLAKE_ACCOUNT`.
connection_parameters = {
"user": os.environ["SNOWFLAKE_USER"], # Your username in the Snowflake account that you're using with Tecton
"password": os.environ[
"SNOWFLAKE_PASSWORD"
], # Your password in the Snowflake account that you're using with Tecton. Not needed if using the authenticator parameter above.
"account": os.environ[
"SNOWFLAKE_ACCOUNT"
], # The Snowflake account you're using with Tecton (takes the form \<SNOWFLAKE_ACCOUNT\>.snowflakecomputing.com
"warehouse": "TECTON_WAREHOUSE", # The Snowflake warehouse you're using to run unit test query
# Database and schema are required to create various temporary objects by tecton
"database": "TECTON",
"schema": "PUBLIC",
}
conn = snowflake.connector.connect(**connection_parameters)
# Tecton will use this Snowflake connection for all interactive queries
tecton.snowflake_context.set_connection(conn)


def test_user_credit_card_issuer_ghf():
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)

events_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)

# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_features_for_events(
events_df, mock_inputs={"fraud_users_batch": input_pandas_df}
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)

# NOTE: It is important to sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)

pandas.testing.assert_frame_equal(actual, expected)

Skip Tests​

Specifying the --skip-tests flag when running tecton apply, tecton plan, or tecton test will skip execution of Tecton tests.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon