Unit Tests
-
If you are using Spark and you will be running unit tests, the Tecton CLI must be installed using one of the following commands:
- To install with Pyspark 3.1:
pip install 'tecton[pyspark]'
- To install with Pyspark 3.2:
pip install 'tecton[pyspark3.2]'
- To install with Pyspark 3.3:
pip install tecton pyspark==3.3
- To install with Pyspark 3.1:
-
If you are using Rift, you must install the Tecton CLI with the following commands: -
pip install 'tecton[rift]'
-
Feature retrieval methods such as
get_features_in_range
andrun_transformation
use validated Tecton Objects during retrieval by default. Validation requires access to the Tecton API and a Compute Cluster which are often unavailable in Unit Testing environments.To use these methods using mock inputs without validation in Unit Tests, set the validation mode to
skip
using:conf.set("TECTON_SKIP_OBJECT_VALIDATION", "True")
Overview​
Tecton supports unit testing for feature definitions to ensure the correctness
of feature definitions. Unit tests can be defined in feature repositories in
file paths matching the pattern **/tests/*.py
.
Tests can be run when the following commands are executed:
-
tecton apply
: Runs the tests and applies the repo if the tests pass. -
tecton plan
: Runs the tests and shows the changes that would be made to the repo if the changes were applied. -
tecton test
: Only Runs the tests. -
pytest
: See Running Tests Usingpytest
section for more information.
Configuring Compute Mode for Unit Tests​
Normally the compute mode (Rift, Spark) is determined using defaults configured
on the Tecton server. However for tests running without access to the server,
the compute mode can be configured using the --default-compute-mode
argument.
By default it is set to spark
. To test with Rift, use the following command:
tecton test --default-compute-mode rift
Running Tests Using pytest
Directly​
While tecton test
is the recommended way to run unit tests, tests can also be
run as part of a larger test suite using pytest
. However, in that case the
compute mode will need to be set using the
TECTON_OFFLINE_RETRIEVAL_COMPUTE_MODE
and TECTON_BATCH_COMPUTE_MODE
environment variables:
TECTON_OFFLINE_RETRIEVAL_COMPUTE_MODE=rift
TECTON_BATCH_COMPUTE_MODE=rift
pytest
Test Pandas or Python-Based Feature Views​
Testing a Feature View with Pandas or Python transformations is straightforward
All we need is the Feature View and a test file located in the tests/
directory.
For example, consider a Feature View that determines if a transaction amount is high:
from tecton import RequestSource, realtime_feature_view, Attribute
from tecton.types import Float64, Int64
import pandas
# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])
# Define the output features
output_features = [Attribute("transaction_amount_is_high", Int64)]
# This Realtime Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@realtime_feature_view(
sources=[transaction_request],
mode="pandas",
features=output_features,
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request: pandas.DataFrame):
import pandas as pd
df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df
With the above Feature View, we can define the unit test that mocks up some sample inputs and asserts that we're getting the expected result.
### tests/transaction_amount_is_high.py ###
from fraud.features.realtime_feature_views.transaction_amount_is_high import transaction_amount_is_high
import pandas
# Testing the 'transaction_amount_is_high' feature which depends on request data ('amount') as input
def test_transaction_amount_is_high():
transaction_request = pandas.DataFrame({"amount": [124, 10001, 34235436234]})
actual = transaction_amount_is_high.run_transformation({"transaction_request": transaction_request}).to_pandas()
expected = pandas.DataFrame({"transaction_amount_is_high": [0, 1, 1]})
pandas.testing.assert_frame_equal(actual, expected)
Test Spark-based Feature Views​
Creating a unit test in a PySpark or Spark SQL feature view is similar to the
above example, except that we also need to install the Java Development Kit
(JDK) locally and provide a SparkSession
in the test code.
tecton test
will not connect to Tecton's backend APIs so it can be run in a
hermetic environment. Tecton ensures that any tested feature transformation code
is serialized and deserialized to ensure that executed code is representative of
code that will be executed in materialization.
Installing the JDK​
Unit tests for a PySpark or Spark SQL Feature View require JDK version 8 (u201 or later) or JDK version 11 to run.
Older versions of the JDK are provided by Oracle to help developers debug issues in older systems. They are not updated with the latest security patches and are not recommended for use in production.
Unit tests using a PySpark or Spark SQL Feature View are not supported in environments that have Databricks Connect installed.
Install the JDK and then set the JAVA_HOME
environment variable.
Supported Methods​
In unit tests, the following methods are supported:
- Realtime Feature Views:
run_transformation
. - Batch and Stream Feature Views:
run_transformation
,get_features_in_range
, andget_features_for_events
.
Examples​
For example, consider a Feature View that determines if a user has good credit:
### user_has_good_credit.py ###
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.credit_scores_batch import credit_scores_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[credit_scores_batch],
entities=[user],
timestamp_field="timestamp",
features=[Attribute("user_has_good_credit", Bool)],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2021, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=120),
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
credit_score > 670 as user_has_good_credit,
timestamp
FROM
{credit_scores}
"""
Because this is a Spark SQL Feature View, we'll need a SparkSession
to test.
Tecton provides the tecton_pytest_spark_session
pytest
fixture. This fixture creates a
SparkSession
.
Finally, we can define the actual unit test that mocks up some sample inputs, and asserts that we're getting the expected result.
You should ensure that the mock data schema exactly matches the source schema.
Any datetime
partition columns that may be present need to match, too.
Use .run_transformation()
in a unit test​
from datetime import datetime, timedelta
import pandas
from fraud.features.batch_feature_views.user_has_good_credit import user_has_good_credit
def test_user_credit_card_issuer(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"signup_timestamp": [datetime(2022, 5, 1)] * 3,
"credit_score": [600, 670, 700],
}
)
credit_scores_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)
# Simulate materializing features for May 1st.
output = user_has_good_credit.run_transformation(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), mock_inputs={"credit_scores": credit_scores_df}
)
actual = output.to_pandas()
expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1)] * 3,
"user_has_good_credit": [False, False, True],
}
)
pandas.testing.assert_frame_equal(actual, expected)
Just like in the example above, this test will now run when we execute
tecton apply
, tecton plan
, or tecton test
.
Use get_features_for_events()
in a unit test​
The ability to run get_features_for_events
(Previously
get_historical_features
) in a unit test was introduced in Tecton SDK 0.7 and
does not work in prior versions.
def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)
events = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)
# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_features_for_events(events, mock_inputs={"fraud_users_batch": input_spark_df})
actual = output.to_pandas()
expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)
# NOTE: because the Spark join has non-deterministic ordering, it is important to
# sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
pandas.testing.assert_frame_equal(actual, expected)
Configure the Local Test Spark Session​
Tecton provides a Pytest session-scoped tecton_pytest_spark_session
fixture.
However, that Spark session may not be configured correctly for your tests. In
that case, you may either configure the Tecton-provided fixture or create your
own Spark session.
Here's an example of configuring the Tecton-provided Spark session:
import pytest
@pytest.fixture(scope="module", autouse=True)
def configure_spark_session(tecton_pytest_spark_session):
# Custom configuration for the spark session.
tecton_pytest_spark_session.conf.set("spark.sql.session.timeZone", "UTC")
Here's an example of how to create your own Spark session and provide it to Tecton:
from importlib import resources
@pytest.fixture(scope="session")
def my_custom_spark_session():
"""Returns a custom spark session configured for use in Tecton unit testing."""
with resources.path("tecton_spark.jars", "tecton-udfs-spark-3.jar") as path:
tecton_udf_jar_path = str(path)
spark = (
SparkSession.builder.appName("my_custom_spark_session")
.config("spark.jars", tecton_udf_jar_path)
# This short-circuit's Spark's attempt to auto-detect a hostname for the master address, which can lead to
# errors on hosts with "unusual" hostnames that Spark believes are invalid.
.config("spark.driver.host", "localhost")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
try:
tecton.set_tecton_spark_session(spark)
yield spark
finally:
spark.stop()