Batch Feature View Examples
Row-Level Transformation​
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from tecton.types import String
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[fraud_users_batch],
entities=[user],
mode="pandas",
online=False,
offline=False,
feature_start_time=datetime(2017, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=3650),
timestamp_field="signup_timestamp",
features=[
Attribute("credit_card_issuer", String),
],
description="User credit card issuer derived from the user credit card number.",
)
def user_credit_card_issuer(fraud_users_batch):
fraud_users_batch["credit_card_issuer"] = fraud_users_batch["cc_num"].apply(
lambda x: "Visa"
if str(x)[0] == "4"
else "MasterCard"
if str(x)[0] == "5"
else "Discover"
if str(x)[0] == "6"
else "other"
)
return fraud_users_batch[["user_id", "signup_timestamp", "credit_card_issuer"]]
from tecton import batch_feature_view, Attribute
from tecton.types import String
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[fraud_users_batch],
entities=[user],
mode="spark_sql",
online=False,
offline=False,
# Note the timestamp is the signup date, hence the old start_time.
feature_start_time=datetime(2017, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=3650),
timestamp_field="signup_timestamp",
features=[
Attribute("credit_card_issuer", String),
],
description="User credit card issuer derived from the user credit card number.",
)
def user_credit_card_issuer(fraud_users_batch):
return f"""
SELECT
user_id,
signup_timestamp,
CASE SUBSTRING(CAST(cc_num AS STRING), 0, 1)
WHEN '4' THEN 'Visa'
WHEN '5' THEN 'MasterCard'
WHEN '6' THEN 'Discover'
ELSE 'other'
END as credit_card_issuer
FROM
{fraud_users_batch}
"""
from tecton import batch_feature_view, attribute
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[fraud_users_batch],
entities=[user],
mode="snowflake_sql",
online=False,
offline=False,
# Note the timestamp is the signup date, hence the old start_time.
feature_start_time=datetime(2017, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=3650),
timestamp_field="signup_timestamp",
features=[
Attribute("credit_card_issuer", String),
],
description="User credit card issuer derived from the user credit card number.",
)
def user_credit_card_issuer(fraud_users_batch):
return f"""
SELECT
user_id,
signup_timestamp,
CASE SUBSTRING(CAST(cc_num AS STRING), 0, 1)
WHEN '4' THEN 'Visa'
WHEN '5' THEN 'MasterCard'
WHEN '6' THEN 'Discover'
ELSE 'other'
END as credit_card_issuer
FROM
{fraud_users_batch}
"""
Time-Windowed Aggregations​
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Int64
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Int64
from fraud.data_sources.transactions import transactions_batch
from fraud.entities import user
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Int64
from fraud.data_sources.transactions import transactions_batch
from fraud.entities import user
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="snowflake_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
Row-Level PySpark Transformation​
from tecton import batch_feature_view, Attribute
from tecton.types import Timestamp
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[fraud_users_batch],
entities=[user],
mode="pyspark",
online=False,
offline=False,
timestamp_field="timestamp",
features=[Attribute("user_date_of_birth", String)],
# Note the timestamp is the signup date, hence the old start_time.
feature_start_time=datetime(2017, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=3650),
description="User date of birth, entered at signup.",
)
def user_date_of_birth(fraud_users_batch):
from pyspark.sql import functions as f
return (
fraud_users_batch.withColumn("user_date_of_birth", f.date_format(f.col("dob"), "yyyy-MM-dd"))
.withColumnRenamed("signup_timestamp", "timestamp")
.select("user_id", "user_date_of_birth", "timestamp")
)
Custom Aggregations​
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
incremental_backfills=True,
batch_schedule=timedelta(days=1),
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
return f"""
SELECT
user_id,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
from tecton import batch_feature_view, Attribute
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="snowflake_sql",
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
incremental_backfills=True,
batch_schedule=timedelta(days=1),
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
return f"""
SELECT
user_id,
TIMESTAMPADD(microsecond, -1, TO_TIMESTAMP('{context.end_time}')) as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
Modular Transformation Pipeline​
- Spark
- Snowflake
from tecton import transformation, batch_feature_view, const
from ads.entities import auction
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
# Create new column by splitting the string in an existing column.
@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""
# Create features based on the keyword array
@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
size({keyword_column}) AS num_keywords,
array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
FROM {input_data}
"""
# This feature view runs in pipeline mode to turn the keyword string into an
# array of words, then create metrics based on that array.
@batch_feature_view(
mode="pipeline",
sources=[ad_impressions_batch],
entities=[auction],
ttl=timedelta(days=1),
batch_schedule=timedelta(days=1),
online=False,
offline=False,
feature_start_time=datetime(2022, 5, 1),
timestamp_field="timestamp",
features=[
Attribute("keyword_list", Array(String)),
Attribute("num_keywords", Int64),
Attribute("keyword_contains_bitcoin", Bool),
],
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
return keyword_stats(split_keywords, const("keywords"))
from tecton import transformation, batch_feature_view, const
from ads.entities import auction
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
# Create new column by splitting the string in an existing column.
@transformation(mode="snowflake_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""
# Create features based on the keyword array
@transformation(mode="snowflake_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
array_size({keyword_column}) AS num_keywords,
array_contains('bitcoin'::variant, {keyword_column}) AS keyword_contains_bitcoin
FROM {input_data}
"""
# This feature view runs in pipeline mode to turn the keyword string into an
# array of words, then create metrics based on that array.
@batch_feature_view(
mode="pipeline",
sources=[ad_impressions_batch],
entities=[auction],
ttl=timedelta(days=1),
batch_schedule=timedelta(days=1),
online=False,
offline=False,
feature_start_time=datetime(2022, 5, 1),
timestamp_field="timestamp",
features=[
Attribute("keyword_list", Array(String)),
Attribute("num_keywords", Int64),
Attribute("keyword_contains_bitcoin", Bool),
],
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
return keyword_stats(split_keywords, const("keywords"))
Multiple Data Sources​
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch.unfiltered()],
entities=[user],
mode="spark_sql",
online=False,
offline=False,
feature_start_time=datetime(2022, 5, 1),
timestamp_field="timestamp",
features=[Attribute("user_is_adult", Int64)],
batch_schedule=timedelta(days=1),
ttl=timedelta(days=100),
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
IF (datediff(timestamp, to_date(dob)) > (18*365), 1, 0) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
from tecton import batch_feature_view, FilteredSource, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch.unfiltered()],
entities=[user],
mode="snowflake_sql",
online=False,
offline=False,
feature_start_time=datetime(2022, 5, 1),
timestamp_field="timestamp",
features=[Attribute("user_is_adult", Int64)],
batch_schedule=timedelta(days=1),
ttl=timedelta(days=100),
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
IFF (datediff(year, timestamp, to_date(dob)) > (18*365), 1, 0) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""