Skip to content

Unit Testing Tecton Transformations

Overview

Plan Hooks provide a framework for executing unit tests on your transformations every time tecton plan or tecton apply is run. A user will only be able to apply their changes if the tests pass.

Plan Hooks can also be tested directly using tecton test.

Plan Hooks are written in Python and therefore completely customizable. In addition to unit tests, you can use plan hooks to Example use cases include enforcing a commit policy or running basic tests against your code.

Enabling Plan Hooks

When tecton init is run to configure a feature repository in a new directory, it creates a folder called .tecton containing the file .tecton/hooks/plan.py. Plan Hooks are enabled by default, but the default configuration is a no-op.

How Plan Hooks Work

Arbitrary logic can be defined in plan.py as long as it adheres to the return code contract for run(). Each time tecton plan or apply is run, it will execute the run() method in plan.py. tecton expects the following return codes when running run():

  • 0 if all tests pass
  • None if no tests were run
  • Non-zero integer in the case of test failures

If a non-zero value is returned from run(), stdout will be printed to stderr. If a 0 or None is returned, all hook output will be suppressed.

In summary, plan hooks must meet the following requirements:

  • Must be defined in .tecton/hooks/plan.py
  • Must contain a run() method that accepts no arguments.
  • run() must return either 0 (tests pass), None (no tests run) or a non-zero integer return code (test failures).

To configure multiple plan hooks, it's recommended to define them in separate functions in plan.py and call each function from run().

Default Plan Hook: plan.py

The default contents of .tecton/hooks/plan.py contain a no-op hook that returns None.

### plan.py ###

from typing import Optional

def run() -> Optional[int]:
    # No-op plan hook that returns None indicating no tests we run.
    return None

When you run tecton plan, you'll see ✅ Running Tests: No tests found. in the output. For example:

$ tecton plan
Using workspace "prod"
✅ Imported 4 Python modules from the feature repository
✅ Running Tests: No tests found.
✅ Collecting local feature declarations
✅ Performing server-side validation of feature declarations

Running unit tests with pytest

First, we'll use plan.py to configure a test harness for running pytest against all files in the feature repo matching the pattern *_test.py, test_*.py, or test.py.

Second, we'll show example unit tests for Online and Spark transformations.

Creating your test harness

The following test harness installs Spark for local testing, and then executes any files matching our test naming convention. You can simply copy this example in to your feature repository.

### plan.py ###

import os
import sys
from pathlib import Path
from typing import Optional
import pytest

SPARK_FILE_NAME = "spark-2.4.8-bin-hadoop2.7"
SPARK_TGZ = f"https://downloads.apache.org/spark/spark-2.4.7/{SPARK_FILE_NAME}.tgz"
VIRTUAL_ENV_PATH = os.getenv("VIRTUAL_ENV")

def is_valid_test_path(path) -> bool:
    _path = str(path.resolve())
    if VIRTUAL_ENV_PATH is not None and _path.startswith(VIRTUAL_ENV_PATH):
        return False
    if '.git/' in _path:
        return False
    if '/.tecton/' in _path:
        return False
    return True

def run() -> Optional[int]:
    # Run pytest on all *_test.py and test_*.py files and return:
    # - 0 if all tests pass
    # - None if no tests were run
    # - Non-zero exit code indicating test failures

    root_path = str(Path().resolve())
    tecton_init = root_path / Path('.tecton')

    assert tecton_init.exists() and tecton_init.is_dir(), "hook.py must be run from a feature repo root initialized using 'tecton init'!"

    spark_path = tecton_init / Path('spark')
    if not spark_path.exists():
        spark_path.mkdir()
        import requests
        import tempfile
        import tarfile
        r = requests.get(SPARK_TGZ)
        with tempfile.TemporaryFile(prefix="spark-", suffix=".tgz") as f:
            f.write(r.content)
            f.seek(0)
            tarf = tarfile.open(fileobj=f, mode='r:gz')
            tarf.extractall(path=str(tecton_init.resolve()))
        new_spark_path = tecton_init / Path(SPARK_FILE_NAME)
        new_spark_path.rename(spark_path)

    tests = []
    tests.extend([str(p.resolve()) for p in Path(root_path).glob("**/*_test.py") if is_valid_test_path(p)])
    tests.extend([str(p.resolve()) for p in Path(root_path).glob("**/test_*.py") if is_valid_test_path(p)])

    num_py_tests = len(tests)

    exitcode = pytest.main(tests)
    if exitcode == 5:
        # https://docs.pytest.org/en/stable/usage.html#possible-exit-codes
        return None
    return exitcode

if __name__ == "__main__":
    sys.exit(run())

Pandas Transformation Unit Test

Using the test harness provided in the example above, it's possible to define unit tests on pandas functions. Suppose you have an pandas that simply doubles the values passed to it:

### my_transformations.py ###
import pandas
from tecton import transformation

@transformation(mode="pandas")
def transformation_double(input):
    import pandas

    series = []

    for a in input:
        features = {}
        features["output"] = a * 2
        series.append(features)

    return pandas.DataFrame(series)

Now, we want to write a test that asserts the output DataFrame's values are in in fact double the values provided in the input Series.

### my_transformation_test.py ###

from .my_transformations import transformation_double
import pandas as pd
from pandas.testing import assert_frame_equal


def test_my_favorite_doubling_transformation():
    input = pd.DataFrame([1, 2, 3])
    actual = transformation_double(input)
    expected = pd.DataFrame({"output": [2, 4, 6]})

    assert_frame_equal(actual, expected)

After adding my_transformation_test.py to your repo, you'll see the message ✅ Running Tests: Tests passed! when running tecton plan and apply if all tests passed.

$ tecton plan
Using workspace "prod"
✅ Imported 4 Python modules from the feature repository
✅ Running Tests: Tests passed!
✅ Collecting local feature declarations
✅ Performing server-side validation of feature declarations
 ↓↓↓↓↓↓↓↓↓↓↓↓ Plan Start ↓↓↓↓↓↓↓↓↓↓

  + Create FeatureView
    name:            my_feature_package
    owner:           sally

 ↑↑↑↑↑↑↑↑↑↑↑↑ Plan End ↑↑↑↑↑↑↑↑↑↑↑↑

If tests fail, you'll see ⛔ Running Tests: Tests failed :( along with test failure messages.

$ tecton plan
Using workspace "prod"
✅ Imported 4 Python modules from the feature repository
⛔ Running Tests: Tests failed :(
E   AssertionError: DataFrame.iloc[:, 0] (column name="bar") are different
E
E   DataFrame.iloc[:, 0] (column name="bar") values are different (33.33333 %)
E   [left]:  [2, 4, 7]
E   [right]: [2, 4, 6]

pandas/_libs/testing.pyx:174: AssertionError
========================================================================================================= short test summary info ==========================================================================================================
FAILED my_transform_test.py::test_answer - AssertionError: DataFrame.iloc[:, 0] (column name="bar") are different

Spark Transformation Unit Test

Testing a PySpark or Spark SQL transformation is similar to the above example, except that we also need to provide a SparkSession test fixture.

For example, let's say I have a transformation that calculates the number of impressions an ad had per calendar month.

### ad_impression_count_monthly.py ###
from tecton import transformation
from datetime import datetime

@transformation(mode="pyspark")
def ad_impression_count_monthly_transformer(ad_impressions_batch):
    import pyspark.sql.functions as F
    truncated_date_view = input_view.withColumn('timestamp', F.date_trunc('month', F.col('timestamp')))
    return truncated_date_view.groupBy('ad_id', 'timestamp').agg(F.count(F.lit(1)).alias("ad_impression_count"))

Because this is a PySpark transformation, we'll need to create a SparkSession test fixture.

In our conftest.py file:

import findspark
from pyspark.sql import SparkSession
import pytest

@pytest.fixture(scope="session")
def spark_session():
    findspark.init(spark_home='.tecton/spark')
    spark = SparkSession.builder.appName('pytest_spark_session').getOrCreate()

    yield spark

    spark.stop()

Finally, we can define the actual unit test that mocks up some sample ad impressions, and asserts that we're getting the expected counts.

import datetime
import pyspark
from feature_repo.shared.features.ad_impression_count_monthly import ad_impression_count_monthly_transformer

def test_monthly_impression_count(spark_session):
    mock_data = [
        ('ad_id1', "2020-10-28 05:02:11"),
        ('ad_id1', "2020-10-30 01:00:00"),
        ('ad_id2', "2020-10-28 05:02:11")
    ]
    input = spark_session.createDataFrame(mock_data, ['ad_id', 'timestamp'])

    assert ad_impression_count_monthly_transformer is not None

    output = ad_impression_count_monthly_transformer(input).collect()

    assert output[0]['ad_id'] == 'ad_id1'
    assert output[0]['ad_impression_count'] == 2

    assert output[1]['ad_id'] == 'ad_id2'
    assert output[1]['ad_impression_count'] == 1
Just like in the example above, this test will now run when we execute tecton plan.

Other Plan Hook Examples

File Naming Policy Test

As an example of other generic tests you can run with Plan Hooks, suppose you would like to create a naming policy that ensures all python files are prefixed with "ml_ops_". The example below performs this assertion on all python files in the feature repository and returns 0 if all names adhere to the policy or 1 if some names do not adhere to this policy.

### plan.py ###

from pathlib import Path
from typing import Optional

def run() -> Optional[int]:
    # Run a naming policy check on all python files that checks that
    # all file names begin with "ml_ops_"
    # - 0 if all names adhere to the policy.
    # - 1 (or any non-zero code) if names do not meet the policy.

    root_path = str(Path().resolve())
    py_files = []
    py_files.extend([p.resolve() for p in Path(root_path).glob("**/*.py")])

    bad_names = [p for p in py_files if not p.name.startswith("ml_ops_")]
    if len(bad_names) > 0:
        print("Invalid names:")
        for n in bad_names:
            print(str(n))
        return 1
    return 0

Skip Plan Hooks

Specifying the --skip-tests flag when running tecton plan or apply will skip execution of Plan Hooks.

Reset Plan Hooks

If you get carried away writing customized Plan Hook behavior and want to revert to the default, simply run tecton init --reset-hooks. This will delete the contents of .tecton/ and recreate the default plan.py.