Skip to main content
Version: Beta ๐Ÿšง

Unit Testing for Spark-based Feature Views

To create a unit test in a PySpark or Spark SQL feature view, you need to install the Java Development Kit (JDK) locally and provide a SparkSession in the test code.

Installing the JDKโ€‹

Unit tests for a PySpark or Spark SQL Feature View require JDK version 8 (u201 or later) or JDK version 11 to run.

caution

Older versions of the JDK are provided by Oracle to help developers debug issues in older systems. They are not updated with the latest security patches and are not recommended for use in production.

Unit tests using a PySpark or Spark SQL Feature View are not supported in environments that have Databricks Connect installed.

Install the JDK and then set the JAVA_HOME environment variable.

Examplesโ€‹

For example, consider a Feature View that determines if a user has good credit:

### user_has_good_credit.py ###
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.credit_scores_batch import credit_scores_batch
from datetime import datetime, timedelta


@batch_feature_view(
sources=[credit_scores_batch],
entities=[user],
timestamp_field="timestamp",
features=[Attribute("user_has_good_credit", Bool)],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2021, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=120),
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
credit_score > 670 as user_has_good_credit,
timestamp
FROM
{credit_scores}
"""

Because this is a Spark SQL Feature View, we'll need a SparkSession to test. Tecton provides the tecton_pytest_spark_session pytest fixture. This fixture creates a SparkSession.

Finally, we can define a unit test that creates mock input data, evaluates the feature view, and asserts that we're getting the expected result.

You should ensure that the mock data schema exactly matches the source schema. Any datetime partition columns that may be present need to match too.

Use .run_transformation() in a unit testโ€‹

from datetime import datetime, timedelta

import pandas

from fraud.features.batch_feature_views.user_has_good_credit import user_has_good_credit


def test_user_credit_card_issuer(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"signup_timestamp": [datetime(2022, 5, 1)] * 3,
"credit_score": [600, 670, 700],
}
)
credit_scores_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

# Simulate materializing features for May 1st.
output = user_has_good_credit.run_transformation(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), mock_inputs={"credit_scores": credit_scores_df}
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1)] * 3,
"user_has_good_credit": [False, False, True],
}
)

pandas.testing.assert_frame_equal(actual, expected)

Just like in the example above, this test will now run when we execute tecton apply, tecton plan, or tecton test.

Use get_features_for_events() in a unit testโ€‹

def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

events = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)

# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_features_for_events(events, mock_inputs={"fraud_users_batch": input_spark_df})

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)

# NOTE: because the Spark join has non-deterministic ordering, it is important to
# sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)

pandas.testing.assert_frame_equal(actual, expected)

Configure the Local Test Spark Sessionโ€‹

Tecton provides a Pytest session-scoped tecton_pytest_spark_session fixture. However, that Spark session may not be configured correctly for your tests. In that case, you may either configure the Tecton-provided fixture or create your own Spark session.

Here's an example of configuring the Tecton-provided Spark session:

import pytest


@pytest.fixture(scope="module", autouse=True)
def configure_spark_session(tecton_pytest_spark_session):
# Custom configuration for the spark session.
tecton_pytest_spark_session.conf.set("spark.sql.session.timeZone", "UTC")

Here's an example of how to create your own Spark session and provide it to Tecton:

from importlib import resources


@pytest.fixture(scope="session")
def my_custom_spark_session():
"""Returns a custom spark session configured for use in Tecton unit testing."""
with resources.path("tecton_spark.jars", "tecton-udfs-spark-3.jar") as path:
tecton_udf_jar_path = str(path)

spark = (
SparkSession.builder.appName("my_custom_spark_session")
.config("spark.jars", tecton_udf_jar_path)
# This short-circuit's Spark's attempt to auto-detect a hostname for the master address, which can lead to
# errors on hosts with "unusual" hostnames that Spark believes are invalid.
.config("spark.driver.host", "localhost")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
try:
tecton.set_tecton_spark_session(spark)
yield spark
finally:
spark.stop()

Was this page helpful?