Batch Feature View
A Batch Feature View defines transformations against one or many Batch Data Sources (e.g. Snowflake, Redshift, BigQuery, S3 etc.) and computes features on a schedule. They can also run automatic backfills when they’re first created.
For information about connecting data sources to data, see
Connect to Data.
Feature Examples:
- determining if a user's credit score is over a pre-defined threshold
- counting the total number of transactions over a time window in batch
- batch ingesting pre-computed feature values from an existing batch source
Batch Feature Transformations​
Batch Feature Views can define row-level transformations for filtering and projection. They can also define aggregations using Tecton's Aggregation Engine or plain SQL.
We will look at them one by one.
Row-Level Transformations​
Batch Feature Views can define row-level filtering or projection transformations using Pandas, Snowflake SQL, or Spark
Below is an example that takes in a Batch Datasource transactions_batch
and
then does row-level filtering to select only three columns: user_id
,
timestamp
, and amt
. The batch_schedule
parameter tells Tecton to schedule
a batch materialization job once a day.
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from tecton.types import Float64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("amt", Float64),
],
)
def user_last_transaction_amount(transactions_batch):
return transactions_batch[["user_id", "timestamp", "amt"]]
from tecton import batch_feature_view, Attribute
from tecton.types import Float64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="spark_sql",
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Attribute("AMT", Float64),
],
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
AMT,
TIMESTAMP
FROM
{transactions_batch}
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Float64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="snowflake_sql",
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Attribute("AMT", Float64),
],
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
AMT,
TIMESTAMP
FROM
{transactions_batch}
"""
Batch Feature Views can also be used to perform row-level transformations on
multiple Data Sources joined together. In the example below, two data sources
transaction_batch
and fraud_users_batch
are combined together into a single
Feature View in order to combine date of birth information from
fraud_users_batch
with transaction information from transaction_batch
to
determine if a user was an adult at the time of a given transaction.
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Bool
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("user_is_adult", Int32),
],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
fraud_users_batch["dob"] = pd.to_datetime(fraud_users_batch["dob"])
transactions_batch["timestamp"] = pd.to_datetime(transactions_batch["timestamp"])
joined_df = pd.merge(transactions_batch, fraud_users_batch, on="user_id")
joined_df["user_is_adult"] = (joined_df["timestamp"] - joined_df["dob"]).dt.days > 18 * 365
return joined_df[["user_id", "timestamp", "user_is_adult"]]
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Bool
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch],
entities=[user],
mode="spark_sql",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("user_is_adult", Int32),
],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
datediff(timestamp, to_date(dob)) > (18*365) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Bool
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch],
entities=[user],
mode="snowflake_sql",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("user_is_adult", Int32),
],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
datediff(year, timestamp, to_date(dob)) > (18*365) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
Time-Window aggregations​
Batch Feature Views can also run aggregations over multiple rows in a dataset
using using Tecton's
Aggregation Engine. The example
below takes the same transactions_batch
datasource, extracts three columns
(user_id
, transaction
, timestamp
) and generates partial aggregates which
are rolled up retrieval time to generate 3 features: transaction_count_1d_1d
,
transaction_count_30d_1d
, and transaction_count_90d_1d
.
The aggregation_interval
parameter defines the interval by which the
aggregation time window advances (e.g., 1 day, in the example below).
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Field, Int64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Field, Int64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
description="User transaction totals over a series of time windows, updated daily.",
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
TRANSACTION,
TIMESTAMP
FROM
{transactions_batch}
"""
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Field, Int64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="snowflake_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
description="User transaction totals over a series of time windows, updated daily.",
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
TRANSACTION,
TIMESTAMP
FROM
{transactions_batch}
"""
Custom ETL features (such as arbitrary SQL aggregations)​
In general, if you can express your aggregation features using Tecton's Aggregation Engine, you should always do so because of the engine's performance & efficiency, simplicity and accuracy benefits.
However, in some cases, you may not want to use Tecton's Aggregation Engine. The most common reasons are:
- You are migrating an existing feature into Tecton and don't want to make any changes
- You need aggregation functions that Tecton's Engine doesn't support
- You want to precompute several nested layers of aggregations (e.g. calculate the standard-deviation of the max amount of the lifetime transaction number per country)
Here is a simple example:
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from tecton.types import Int64
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=30)
recent_transactions = transactions_batch[
(transactions_batch["timestamp"] >= start_time) & (transactions_batch["timestamp"] < end_time)
]
result = (
recent_transactions.groupby("user_id")
.agg(distinct_merchant_transaction_count_30d=("merchant", "nunique"))
.reset_index()
)
result["timestamp"] = end_time - pd.Timedelta(microseconds=1)
return result
from tecton import batch_feature_view, Attribute
from tecton.types import Int64
from fraud.data_sources.transactions import transactions_batch
from fraud.entities import user
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="spark_sql",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
return f"""
SELECT
user_id,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Int64
from fraud.data_sources.transactions import transactions_batch
from fraud.entities import user
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="snowflake_sql",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
return f"""
SELECT
user_id,
TIMESTAMPADD(microsecond, -1, TO_TIMESTAMP('{context.end_time}')) as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
Please review this guide for more details on how to develop custom ETL features in a performant and cost-efficient way.
Full list of parameters​
See the API reference for the full list of parameters.