Skip to content

Using Third Party Dependencies in Spark UDFs

Overview

You can use third party Python packages in UDFs used by Transformations by declaring them in extra_pip_dependencies within the cluster config object, that is either tecton.NewEMRClusterConfig or tecton.NewDatabricksClusterConfig.

Limitations

The installed packages will be available to use from Spark UDFs but not from the transformation code itself. For example, if you set extra_pip_dependencies=["tensorflow"], you can use tensorflow in a UDF like this:

@pyspark_transformation(inputs=vds)
def test_transformation(transformation_input):
    from pyspark.sql import functions as F
    from pyspark.sql.types import IntegerType

    # tensorflow can be used within my_tensorflow_udf,
    # but not outside of it
    def my_tensorflow_udf(x):
        import tensorflow as tf
        return int(tf.math.log1p(float(x)).numpy())

    my_tensorflow_udf = F.udf(my_tensorflow_udf, IntegerType())

    ...
Note that the import statement has to be within the UDF body as well. Putting it at the top of the file or inside transformation function won’t work.

Complete example: using third party python package with EMR

In this example we're using an imported function log1p from Tensorflow in a UDF applied within a PySpark transformation.

from datetime import datetime
from tecton import TemporalFeaturePackage, sql_transformation, DataSourceConfig, MaterializationConfig, NewEMRClusterConfig
from shared import entities, virtual_data_sources


@pyspark_transformation(inputs=virtual_data_sources.product_views)
def log_num_clicks(transformation_input):
    from pyspark.sql import functions as F
    from pyspark.sql.types import IntegerType

    def my_udf(x):
        import tensorflow as tf
        return int(tf.math.log1p(float(x)).numpy())

    extra_udf = F.udf(my_udf, IntegerType())

    return transformation_input.select(
        "product_id",
        extra_udf("clicked").alias("log1p_clicked"),
        F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
    )

new_emr = NewEMRClusterConfig(
    instance_type="m4.xlarge",
    number_of_workers=4,
    extra_pip_dependencies=["tensorflow==2.2.0"],
)

product_daily_purchase_count = TemporalFeaturePackage(
    name="my_example_temporal_feature_package",
    description="An example TemporalFeaturePackage that computes the logarithm of a feature using Tensorflow",
    family="product_analytics",
    transformation=log_num_clicks,
    entities=[entities.product],
    materialization=MaterializationConfig(
        schedule_interval="7d",
        serving_ttl="7d",
        data_lookback_period="7d",
        offline_enabled=True,
        online_enabled=True,
        feature_start_time=datetime(year=2020, month=1, day=2),
        batch_materialization=new_emr,
        stream_materialization=new_emr,
    )
)