Skip to content

Transformations

A Transformation is a Tecton primitive that describes a set of operations on data. The operations are expressed through standard frameworks such as SQL, PySpark, and Pandas. Transformations can be chained, meaning the output of one Transformation can be used as the input to another Transformation.

A Transformation is required when defining a Feature Package, except in the case of a Push Feature Package. When using a Transformation to define a Feature Package, your Transformation's output should contain the set of features that are managed by the Feature Package, along with join keys and timestamps needed to join the feature to others.

Using Transformations provide several benefits:

  • Reusability: You can define a common Transformation — to clean up data, for example — that can be shared across all Features.
  • Feature versioning: If you change a Feature Transformation, the Feature Store increments the version of that feature and ensures that you don't accidentally mix features that were computed using two different implementations.
  • End-to-end lineage tracking and reproducibility: Since Tecton manages Transformations, it can tie feature definitions all the way through a training data set and a model that's used in production. So if you need to generate features for an older model with new data, Tecton can run Transformations the older model's feature Transformations on the new data.
  • Trust: It's more likely that a data scientist trusts and reuses another's feature, if they can examine the code and see how the feature is calculated.

Defining a Transformation

Define Transformations by annotating Python functions with one of the *_transformation decorators.

Types of Transformations

There are three types of Transformation: SQL, PySpark, and Online.

SQL Transformation

Use this Transformation to define a SQL query that is executed on input data represented as a table.

Define a SQL Transformation using the @sql_transformation annotation.

PySpark Transformation

Use this Transformation to define a PySpark transformation that is executed on input data. This transformation can execute arbitrary Python, including third party libraries as user-defined PySpark functions (if your cluster allows third party libraries).

Define a PySpark Transformation using the @pyspark_transformation annotation.

Online Transformation

Use this Transformation to define a unit of Python code that is executed on data that is provided at request time to a Feature Service. An Online Transformation cannot be applied to batch or streaming data sources.

Define an Online Transformation using the @online_transformation annotation.

Transformation Attributes

All Transformations accept transformation metadata as parameters to the defining decorators. The parameters used to pass metadata are: owner, family, tags and description.

Batch and Streaming Transformation Attributes

The Transformations used in Batch and Streaming pipelines (@sql_transformation and @pyspark_transformation) accept the following additional attributes as parameters:

  • inputs: The input to this Transformation: either a Virtual Data Source or another Transformation
  • has_context: Whether or not this Transformation takes a Batch Transformation Context argument.

Online Transformation Attributes

Online Transformations (@online_transformation) accept the following additional attributes as parameters.

  • request_context: A RequestContext object that declares the transformation's input schema.
  • output_schema: A PySpark StructType object that defines the expected schema of the Transformation output.

Batch Transformation Context

In some cases, a Transformation may need to know about the time window of feature data that it's operating on. Tecton can optionally provide this data to the Transformation as a MaterializationContext object.

To use this context object in a  @sql_transformation or @pyspark_transformation, set the has_context argument of the decorator to True. Tecton will then provide the context object as the first argument to the decorated function when it is invoked. As an example:

from tecton import sql_transformation
from .data_sources import purchase_log_vds

@sql_transformation(inputs=purchase_log_vds, has_context=True)
def purchase_count_transformer(context, ad_impressions_batch):
    return f"""
    SELECT
        user_id,
        count(*) as total_purchases,
        to_timestamp('{context.feature_data_end_time}') as timestamp
    FROM
        {ad_impressions_batch}
    GROUP BY
        user_id
    """

The context defines which portions of the data are used in the Transformation. Context properties are computed and set by Tecton based on properties of the Feature Package. They are not set by users explicitly.

The available properties of the context are:

  • context.raw_data_start_time: The earliest datetime associated with the raw data
  • context.raw_data_end_time: The latest datetime associated with the raw data
  • context.feature_data_start_time: The earliest datetime that can be associated to the output features of this materialization job
  • context.feature_data_end_time: The latest datetime that can be associated to the output features of this materialization job

For more information, see the MaterializationContext reference.

Online Transformation Request Context

An @online_transformation requires two mandatory inputs:

  • request_context: a RequestContext object declaring the Transformation's input schema
  • output_schema: a StructType object declaring the Transformation's output schema

The Online Transformation is also be designed to support a batch of data passed into it — this allows users to operate on batch data more efficiently.

from tecton import online_transformation, RequestContext
import pandas
from pyspark.sql.types import StructType, StringType, LongType, StructField

# quote_transform's input schema.
rc = RequestContext(
    schema={"quote_text": StringType()}
)

# quote_transform's output schema.
schema = StructType()
schema.add(StructField("quote_length", LongType()))
schema.add(StructField("quote_words", LongType()))

@online_transformation(request_context=rc, output_schema=schema)
def quote_transform(quote_text: pandas.Series):

    batch = []

    # Process all rows in the batch.
    for s in quote_text:
      # Add the struct for this row to the results.
      batch.append({"quote_length": len(s), "quote_words": len(s.split(" "))})

    # Return a pandas DataFrame.
    return pandas.DataFrame(batch)

Example: Define a Transformation

Define a Transformation by annotating Python functions with one of: @sql_transformation, @pyspark_transformation, or @online_transformation.

The input to the first two decorators, SQL and PySpark, can be another Transformation or a Virtual Data Source.

In the following example, create a PySpark Transformation. The function to which the @pyspark_transformation decorator is applied must accept a PySpark DataFrame as input data.

PySpark Transformation

from tecton import pyspark_transformation
from .data_sources import purchase_log_vds
from pyspark.sql import DataFrame

@pyspark_transformation(inputs=purchase_log_vds)
def quarterly_purchase_count_transformer(purchase_log_df: DataFrame):
        return purchase_log_df \
                .select(
                    "user_id",
                    "product_id",
                    date_trunc("quarter", purchase_log_df.timestamp).alias("quarter_end_date")
        ).groupBy("user_id", "product_id", "quarter_end_date")
                .count().alias("buy_count")

For an example of SQL Transformations, see Creating a SQL or PySpark Feature.

Using Transformations

Interacting with Defined Transformations

Most importantly, Transformations are used as an input to a Feature Package. You can reuse the Transformation in any number of new Feature Packages. See Feature Packages for more details.

It is also possible to retrieve defined Transformations and interact with them directly. This is a useful workflow for debugging. In the following example, we retrieve the quarterly_purchase_count_transformer Transformation, and execute the transformation on arbitrary input data.

import tecton
import pandas as pd

transformation = tecton.get_transformation("quarterly_purchase_count_transformer")
df = pd.DataFrame.from_dict({
    "user_id": "u123",
    "product_id": "p123",
    "timestamp":pd.Timestamp.now()
})

transformation.run(df).to_spark()

Run the Transformation with additional context supplied:

from datetime import datetime
import numpy as np

context = tecton.MaterializationContext(
    feature_data_end_time=np.datetime64(datetime.now())
)

transformation.run(df, context=context).to_pandas()

Using Imported Libraries

When applying Transformations to the Tecton feature repository, only the Transformation function’s body gets recorded. Therefore imports and other references from the outside of the Transformation function’s body will not work. In order to use imported libraries, you must import Python libraries in the Transformation function, not at the top level as you normally would. See the example below.

Example

from tecton import pyspark_transformation
from .data_sources import prices_vds
from pyspark.sql import DataFrame
# top level imports are not respected by transformations

@pyspark_transformation(inputs=prices_vds)
def ceiling_prices(prices_vds: DataFrame):
    from pyspark.sql import functions # imports inside the function get serialized
    return (prices_vds
        .select(
            "product_id",
            functions.ceil('price_usd')))

Using Functions Defined in the Feature Repository

To use functions defined in the Feature Repository in a Transformation:

  • Annotate the transitive closure of functions with @inlined
  • The inlined functions themselves must follow the rules of Transformations - their imports must be inlined and any feature repo functions they use must be decorated with @inlined.

Example

from tecton_spark.function_serialization import inlined
from feature_repo.currency_utils import round_jpy # this import must be outside

@inlined
def internal_round(value, precision):
    from pyspark.sql import functions
    return functions.bround(value, precision)

@inlined
def round_currency(currency, value):
    from pyspark.sql import functions
    return (functions.when(currency == 'USD', internal_round(value, 2))
            .when(currency == 'GBP', internal_round(value, 2))
            # note that round_jpy itself must be decorated @inlined
            # but the import cannot be inside the function body
            .when(currency == 'JPY', round_jpy(value, 0))
            .otherwise(1))

@pyspark_transformation(inputs=prices_vds)
def ceiling_prices(prices_vds: DataFrame):
    return (prices_vds
        .select(
            "product_id",
            round_currency('currency', 'price')
        )
    )

User Defined Function

Finally, here's an example of a user-defined function (UDF) in a Transformation. See User-defined functions - Python in the Databricks documentation.

@pyspark_transformation(inputs=prices_vds)
def ceiling_prices(prices_vds: DataFrame):
        from pyspark.sql.functions import udf
        from pyspark.sql.types import LongType

        def squared(s):
          return s * s
        squared_udf = udf(squared, LongType())

        return (prices_vds
        .withColumn(
            "price_squared",
            squared_udf('price').alias('price_squared')
        )
    )