Skip to main content
Version: Beta 🚧

Transformations

Overview​

A transformation is a function that specifies logic to run against data retrieved from external data sources.

By default, transformations are inlined into Feature Views. The following example shows a Feature View that implements a transformation in the body of the Feature View function my_feature_view. The transformation runs in spark_sql mode and renames columns from the data source to feature_one and feature_two.

@batch_feature_view(
mode="spark_sql",
# ...
)
def my_feature_view(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""

Alternatively, transformations can be defined outside of Feature Views in Tecton objects identified with the @transformation decorator. This allows transformations to be modular, discoverable in Tecton's Web UI, and reusable across multiple Feature Views.

Transformation input and output​

Input​

The input to a transformation contains the columns in the data source.

Output​

When a transformation is defined inside of a Feature View, the output of the transformation is a DataFrame that must include:

  1. The join keys of all entities included in the entities list
  2. A timestamp column. If there is more than one timestamp column, a timestamp_key parameter must be set to specify which column is the correct timestamp of the feature values.
  3. Feature value columns. All columns other than the join keys and timestamp will be considered features in a Feature View.

Modes​

A transformation mode specifies the format in which a transformation needs to be written. For example, in spark_sql mode, a transformation needs to be written in SQL, while in pyspark mode, a transformation needs to be written using the PySpark DataFrame API.

This page describes the transformation modes that are supported by transformations defined inside and outside of Feature Views.

The examples show transformations defined inside of Feature Views.

Modes for Batch Feature Views and Stream Feature Views​

mode="spark_sql" and mode="snowflake_sql"​

CharacteristicDescription
SummaryContains a SQL query
Supported Feature View typesBatch Feature View, Stream Feature View. mode="snowflake_sql" is not supported in Stream Feature Views.
Supported data platformsDatabricks, EMR, Snowflake
Input typeA string (the name of a view generated by Tecton)
Output typeA string

Example​

@batch_feature_view(
mode="spark_sql",
# ...
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
IF (credit_score > 670, 1, 0) as user_has_good_credit,
date as timestamp
FROM
{credit_scores}
"""

mode="pyspark"​

CharacteristicDescription
SummaryContains Python code that is executed within a Spark context.
Supported Feature View typesBatch Feature View, Stream Feature View
Supported data platformsDatabricks, EMR
Input typeA Spark DataFrame or a Tecton constant
Output typeA Spark DataFrame
NotesThird party libraries can be included in user-defined PySpark functions if your cluster allows third party libraries.

Example​

@batch_feature_view(
mode="pyspark",
# ...
)
def user_has_good_credit(credit_scores):
from pyspark.sql import functions as F

df = credit_scores.withColumn(
"user_has_good_credit",
F.when(credit_scores["credit_score"] > 670, 1).otherwise(0),
)
return df.select("user_id", df["date"].alias("timestamp"), "user_has_good_credit")

mode="snowpark"​

CharacteristicDescription
SummaryContains Python code that is executed in Snowpark, using the Snowpark API for Python.
Supported Feature View TypesBatch Feature View
Supported data platformsSnowflake
Input typea snowflake.snowpark.DataFrame or a Tecton constant
Output typeA snowflake.snowpark.DataFrame
NotesThe transformation function can call functions that are defined in Snowflake.

Example​

@batch_feature_view(
mode="snowpark",
# ...
)
def user_has_good_credit(credit_scores):
from snowflake.snowpark.functions import when, col

df = credit_scores.withColumn("user_has_good_credit", when(col("credit_score") > 670, 1).otherwise(0))
return df.select("user_id", "user_has_good_credit", "timestamp")

Modes for Realtime Feature Views​

mode="pandas"​

CharacteristicDescription
SummaryContains Python code that operates on a Pandas DataFrame
Supported Feature View TypesRealtime Feature View
Supported data platformsDatabricks, EMR, Snowflake
Input typeA Pandas DataFrame or a Tecton constant
Output typeA Pandas DataFrame

Example​

@realtime_feature_view(
mode="pandas",
# ...
)
def transaction_amount_is_high(transaction_request):
import pandas as pd

df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df

mode="python"​

CharacteristicDescription
SummaryContains Python code that operates on a dictionary
Supported Feature View TypesRealtime Feature View
Supported data platformsDatabricks, EMR, Snowflake
Input typeA dictionary
Output typeA dictionary

Example​

@realtime_feature_view(
mode="python",
# ...
)
def user_age(request, user_date_of_birth):
from datetime import datetime, date

request_datetime = datetime.fromisoformat(request["timestamp"]).replace(tzinfo=None)
dob_datetime = datetime.fromisoformat(user_date_of_birth["USER_DATE_OF_BIRTH"])

td = request_datetime - dob_datetime

return {"user_age": td.days}

Defining Transformations Outside of Feature Views​

Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:

  • Reusability
    • Transformations can be reused by multiple Feature Views.
    • A Feature View can call multiple transformations.
  • Discoverability: Transformations can be searched in the Web UI.

To define a transformation outside of a Feature View, define a function using the @transformation decorator, with mode set to a supported transformation mode. For example:

@transformation(mode="spark_sql")
def my_transformation(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""

In the definition of the Feature View that will use the transformation, specify mode="pipeline", and call the transformation function from the Feature View function.

@batch_feature_view(mode="pipeline", ...)
def my_feature_view(input_data):
return my_transformation(input_data)

Guidelines for using @transformation functions​

  • @transformation function outputs can only be referenced inside mode=β€œpipeline” Feature View definitions.
  • A @transformation function cannot reference another @transformation function. Instead, chained invocations should be entirely declared inside a mode=β€œpipeline” Feature View definition. You can also choose to convert upstream @transformations to a native Python function to reference them inside a @transformation.
  • mode="pipeline" Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects and Tecton const types in and out of upstream @transformations.

An alternative to using a @transformation function​

As an alternative to implementing a transformation function using the @transformation decorator, a native Python function can be used. Native Python functions:

  • Can be called from inside Feature View definitions (that use any mode)
  • Can be called from @transformation functions.
  • Are not discoverable in the Web GUI.

Imports and Dependencies​

Importing Python modules into transformations​

note

Transformations support the pandas and numpy modules, only. These modules can only be used in Pandas transformations.

Python modules must be imported inside the transformation function.

Avoid using aliases for imports (e.g. use import pandas instead of import pandas as pd).

Any modules used for type annotations in function signatures must be imported outside the function.

In the following example, the pandas module is imported in two places:

  • Inside of the transformation function, because the function uses the pandas module
  • Outside of the transformation function, because pandas type annotations are used in the function signature (my_transformation(request: pandas.DataFrame) -> pandas.DataFrame:)
from tecton import transformation
import pandas # required for type hints on my_transformation.


@transformation(mode="pandas")
def my_transformation(request: pandas.DataFrame) -> pandas.DataFrame:
import pandas # required for pandas.DataFrame() below.

df = pandas.DataFrame()
df["amount_is_high"] = (request["amount"] >= 10000).astype("int64")
return df

Importing Python objects into transformation functions​

Object imports must be done outside of the transformation definition.

The following imports of objects into transformation functions are allowed:

  • Functions
  • Constants

The following imports of objects into transformation functions are not allowed:

  • Classes
  • Class instances
  • Enums

In the following example, my_func, my_int_const, my_string_const, my_dict_const are imported from my_local_module. The import takes place outside of the transformation function.

from tecton import transformation
import pandas # required for type hints on my_transformation.
from my_local_module import my_func, my_int_const, my_string_const, my_dict_const


@transformation(mode="pandas")
def my_transformation(request: pandas.DataFrame) -> pandas.DataFrame:
import pandas # required for pandas.DataFrame() below.

df = pandas.DataFrame()
df[my_dict_const["resultval"]] = my_func(request[my_string_const] >= my_int_const)
return df

Using Third Party Dependencies in PySpark Transformations​

Overview​

You can use third party Python packages in UDFs used by PySpark transformations (mode="pyspark") by declaring them in extra_pip_dependencies within the cluster config object, that is either tecton.EMRClusterConfig or tecton.DatabricksClusterConfig. An example follows:

new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
python_version="python_3_9_13",
)

In extra_pip_dependencies, you can also specify Amazon S3 URIs, to import your own modules. Ensure that the Tecton Spark IAM role or your Tecton Databricks instance profile has access to read from this S3 URI.

To use your own module, package it as a Python wheel or egg file in S3.

Using a third party dependency inside of a Feature View transformation​

In this example we're using an imported function log1p from Tensorflow in a UDF that is defined inside of a PySpark Feature View transformation.

Note that the import statement has to be within the UDF body; placing it at the top of the file or inside transformation function will not work.

from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, Attribute
from tecton.types import Int64
from shared import entities, data_sources

new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
python_version="python_3_9_13",
)


@batch_feature_view(
mode="pyspark",
sources=[user_clicks],
entities=[entities.product],
timestamp_field="timestamp",
features=[Attribute("log1p_clicked", Int64)],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
batch_compute=new_emr,
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType

def my_udf(x):
import tensorflow as tf

return int(tf.math.log1p(float(x)).numpy())

extra_udf = F.udf(my_udf, LongType())

return user_clicks.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)

Examples of @transformations functions​

note

The following code is referenced by the examples that follow it.

ds_func is a data source function. The function generates a DataFrame, which is used by the transformations in the examples.

@spark_batch_config()
def ds_func(spark):

import pandas as pd

content_ids = ["c1", "c2"]
other_vals = [1, 2]
ts = [pd.Timestamp("2022-01-01T12"), pd.Timestamp("2022-01-02T12")]
return spark.createDataFrame(
pd.DataFrame({"content_id": content_ids, "other_vals": other_vals, "event_timestamp": ts})
)


test_source = BatchSource(name="test_source", batch_config=ds_func)

from tecton import Entity
from tecton.types import Field, String

content = Entity(name="Content", join_keys=[Field("content_id", String)])

A Feature View that calls a pyspark @transformation​

@transformation(mode="pyspark")
def transform1(input_df):
return df2.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
aggregation_interval=timedelta(days=1),
)
def fv(input_df):
return transform1(input_df)

A Feature View that calls two pyspark @transformations with chaining​

@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
)
def fv(input_df):
return transform1(add_impression(input_df))

A Feature View that calls a pyspark @transformation, which calls a native Python function​

def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
)
def fv(input_df):
return transform1(input_df)

A pyspark Feature View that calls two native Python functions​

def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
)
def fv(input_df):
intermediate_df = add_impression(input_df)
return transform1(intermediate_df)

A pyspark Feature View that calls a native Python function and invokes the pyspark method withColumn​

def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
)
def fv(input_df):
from pyspark.sql import functions as F

intermediate_df = input_df.withColumn("impression", F.lit(1))
return transform1(intermediate_df)

A Feature View that calls a pyspark @transformation, passing two pyspark @transformation outputs​

@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@transformation(mode="pyspark")
def pick1(df1, df2):
return df1


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))

A pipeline Feature View that calls a spark_sql @transformation, passing two pyspark @transformation outputs​

@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@transformation(mode="spark_sql")
def pick1(df1, df2):
return f"SELECT * FROM {df1}"


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("other_vals", Int64), function="sum", time_window=timedelta(days=7))],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))

A str_split implementation that uses two @transformations​

In this example, we implement a generic str_split transformation on a specified column, followed by another transformation to calculate some summary statistics for the feature.

Note that passing constants to a transformations requires using const which can be imported from tecton.

from tecton import transformation, batch_feature_view, const, Attribute
from tecton.types import Field, String, Bool, Array, Int64
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime


@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""


@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
size({keyword_column}) AS num_keywords,
array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
FROM {input_data}
"""


@batch_feature_view(
mode="pipeline",
sources=[ad_impressions_batch],
entities=[auction],
timestamp_field="timestamp",
features=[
Attribute("keyword_list", Array(String)),
Attribute("num_keywords", Int64),
Attribute("keyword_contains_bitcoin", Bool),
],
batch_schedule=timedelta(days=1),
online=True,
offline=True,
feature_start_time=datetime(2020, 5, 1),
ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
return keyword_stats(split_keywords, const("keywords"))

FAQ: Why can’t I directly invoke PySpark methods on the output from a @transformation?​

Transformations are intended to be interchangeable within a supported compute environment. For Spark, this means mode="pyspark" and 'mode="spark_sql" transformations can be mixed. For example, this is a completely valid pipeline:

@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("a", "b", "c")


@transformation(mode="spark_sql")
def select_all(df):
return f"SELECT * FROM {df}"


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
features=[Aggregate(input_column=Field("a", Int64), function="count", time_window=datetime.timedelta(days=2))],
)
def fv(input_df):
return select_all(transform1(input_df))

Was this page helpful?