tecton.transformation

tecton.transformation(mode, name=None, description=None, owner=None, tags=None)

Declares a Transformation that wraps a user function. Transformations are assembled in a pipeline function of a Feature View.

Parameters
  • mode (str) – The mode for this transformation must be one of “spark_sql”, “pyspark”, “snowflake_sql”, “snowpark”, “athena”, “pandas” or “python”.

  • name (Optional[str]) – Unique, human friendly name that identifies the Transformation. Defaults to the function name.

  • description (Optional[str]) – A human readable description.

  • owner (Optional[str]) – Owner name (typically the email of the primary maintainer).

  • tags (Optional[Dict[str, str]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

Returns

A wrapped transformation

Examples of Spark SQL, PySpark, Pandas, and Python transformation declarations:

from tecton import transformation
from pyspark.sql import DataFrame
import pandas as pd

# Create a Spark SQL transformation.
@transformation(mode="spark_sql",
                description="Create new column by splitting the string in an existing column")
def str_split(input_data, column_to_split, new_column_name, delimiter):
    return f'''
        SELECT
            *,
            split({column_to_split}, {delimiter}) AS {new_column_name}
        FROM {input_data}
    '''

 # Create an Athena transformation.
 @transformation(mode="athena",
                 description="Create new column by splitting the string in an existing column")
 def str_split(input_data, column_to_split, new_column_name, delimiter):
     return f'''
         SELECT
             *,
             split({column_to_split}, '{delimiter}') AS {new_column_name}
         FROM {input_data}
     '''

# Create a PySpark transformation.
@transformation(mode="pyspark",
                description="Add a new column 'user_has_good_credit' if score is > 670")
def user_has_good_credit_transformation(credit_scores):
    from pyspark.sql import functions as F

    (df = credit_scores.withColumn("user_has_good_credit",
        F.when(credit_scores["credit_score"] > 670, 1).otherwise(0))
    return df.select("user_id", df["date"].alias("timestamp"), "user_has_good_credit") )

# Create a Pandas transformation.
@transformation(mode="pandas",
                description="Whether the transaction amount is considered high (over $10000)")
def transaction_amount_is_high(transaction_request):
    import pandas as pd

    df = pd.DataFrame()
    df['amount_is_high'] = (request['amount'] >= 10000).astype('int64')
    return df

@transformation(mode="python",
                description="Whether the transaction amount is considered high (over $10000)")
# Create a Python transformation.
def transaction_amount_is_high(transaction_request):

    result = {}
    result['transaction_amount_is_high'] = int(transaction_request['amount'] >= 10000)
    return result