Skip to main content
Version: 0.4

Defining Transformations Outside of Feature Views

Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:

  • Reusability
    • Transformations can be reused by multiple Feature Views.
    • A Feature View can call multiple transformations.
  • Discoverability: Transformations can be searched in the Web UI.

To define a transformation outside of a Feature View, define a function using the @transformation decorator, with mode set to a supported transformation mode. For example:

@transformation(mode="spark_sql")
def my_transformation(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""

In the definition of the Feature View that will use the transformation, specify mode="pipeline", and call the transformation function from the Feature View function.

@batch_feature_view(mode="pipeline", ...)
def my_feature_view(input_data):
return my_transformation(input_data)

Guidelines for using @transformation functions

  • @transformation function outputs can only be referenced inside mode=“pipeline” Feature View definitions.
  • A @transformation function cannot reference another @transformation function. Instead, chained invocations should be entirely declared inside a mode=“pipeline” Feature View definition. You can also choose to convert upstream @transformations to a native Python function to reference them inside a @transformation.
  • mode="pipeline" Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects and Tecton const types in and out of upstream @transformations.

An alternative to using a @transformation function

As an alternative to implementing a transformation function using the @transformation decorator, a native Python function can be used. Native Python functions:

  • Can be called from inside Feature View definitions (that use any mode)
  • Can be called from @transformation functions.
  • Are not discoverable in the Web GUI.

A Feature View that calls a pyspark @transformation

@transformation(mode="pyspark")
def transform1(input_df):
return df2.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return transform1(input_df)

A Feature View that calls two pyspark @transformations with chaining

@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return transform1(add_impression(input_df))

A Feature View that calls a pyspark @transformation, which calls a native Python function

def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return transform1(input_df)

A pyspark Feature View that calls two native Python functions

def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
intermediate_df = add_impression(input_df)
return transform1(intermediate_df)

A pyspark Feature View that calls a native Python function and invokes the pyspark method withColumn

def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")


@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
from pyspark.sql import functions as F

intermediate_df = input_df.withColumn("impression", F.lit(1))
return transform1(intermediate_df)

A Feature View that calls a pyspark @transformation, passing two pyspark @transformation outputs

@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@transformation(mode="pyspark")
def pick1(df1, df2):
return df1


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))

A pipeline Feature View that calls a spark_sql @transformation, passing two pyspark @transformation outputs

@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F

return input_df.withColumn("impression", F.lit(1))


@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")


@transformation(mode="spark_sql")
def pick1(df1, df2):
return f"SELECT * FROM {df1}"


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))

A str_split implementation that uses two @transformations

In this example, we implement a generic str_split transformation on a specified column, followed by another transformation to calculate some summary statistics for the feature.

Note that passing constants to a transformations requires using const which can be imported from tecton.

from tecton import transformation, batch_feature_view, const, FilteredSource
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime


@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""


@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
size({keyword_column}) AS num_keywords,
array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
FROM {input_data}
"""


@batch_feature_view(
mode="pipeline",
sources=[FilteredSource(ad_impressions_batch)],
entities=[auction],
batch_schedule=timedelta(days=1),
online=True,
offline=True,
feature_start_time=datetime(2020, 5, 1),
ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
return keyword_stats(split_keywords, const("keywords"))

FAQ: Why can’t I directly invoke PySpark methods on the output from a @transformation?

Transformations are intended to be interchangeable within a supported compute environment. For Spark, this means mode="pyspark" and 'mode="spark_sql" transformations can be mixed. For example, this is a completely valid pipeline:

@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("a", "b", "c")


@transformation(mode="spark_sql")
def select_all(df):
return f"SELECT * FROM {df}"


@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return select_all(transform1(input_df))

Was this page helpful?

Happy React is loading...