Batch Feature View
A
Batch Feature View
(BFV) generates features from upstream data on a scheduled cadence.
BFVs define transformations against one or many
BatchSource
Data Sources (e.g. Snowflake,
Redshift, BigQuery, S3 etc.), and then generate the features on the configured
schedule and publish the features to the online and offline stores. They can
also run automatic backfills when they are first created.
Common Examplesβ
- determining if a user's credit score is over a pre-defined threshold
- counting the total number of transactions over a time window
- combining data sources to determine a user's age at the time of a transaction
This example feature view uses a transactions data source to calculate the mean transaction amount for a user over 1, 3, and 7 days:
@batch_feature_view(
description="User transaction metrics over 1, 3, and 7 days",
sources=[transactions],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
aggregation_interval=timedelta(days=1),
features=[
Aggregate(input_column=Field("amount", Float64), function="mean", time_window=timedelta(days=1)),
Aggregate(input_column=Field("amount", Float64), function="mean", time_window=timedelta(days=3)),
Aggregate(input_column=Field("amount", Float64), function="mean", time_window=timedelta(days=7)),
],
online=True,
offline=True,
feature_start_time=datetime(2020, 1, 1),
)
def user_transaction_metrics(transactions):
return transactions[["user_id", "timestamp", "amount"]]
Overviewβ
To define a Batch Feature View, you'll need to be familiar with the following concepts:
Input Data Sourcesβ
Batch Feature Views operate on one or more input batch data sources specified in
the sources parameter. You need to determine what data sources are required to
define your desired features.
For information about connecting data sources to data, see Connect to Data.
Feature Definitionβ
Batch features are created by using the @batch_feature_view definition to
decorate a python transformation function. The transformation function can
operate in one of many modes (e.g. pandas) depending on your cluster's compute.
The @batch_feature_view decorator contains configuration parameters that control
behavior like materialization, aggregations, and other behaviors. Each of these
will be covered in the implementation guide.
Testing and Deploymentβ
You should test your feature views to ensure your feature definitions and configuration are correct. After you've defined your Batch Feature View, you can write tests for it following the testing guide.
Once your features are defined and tested, you can deploy the feature using
tecton apply.
Implementation Guideβ
Define a Batch Feature Viewβ
- Declare one or more
BatchSources that point to your dataset - Define your feature transformation logic using the
@batch_feature_viewdecorator
Batch Feature Transformationsβ
Batch Feature Views can define row-level transformations for filtering and projection.
Row-Level Transformationsβ
Batch Feature Views can define row-level filtering or projection transformations using Pandas, Snowflake SQL, or Spark
Below is an example that takes in a Batch Datasource transactions_batch and
then does row-level filtering to select only three columns: user_id,
timestamp, and amt. The batch_schedule parameter tells Tecton to schedule
a batch materialization job once a day.
- Rift
- Spark
- Rift (Snowflake SQL)
from tecton import batch_feature_view, Attribute
from tecton.types import Float64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("amt", Float64),
],
)
def user_last_transaction_amount(transactions_batch):
return transactions_batch[["user_id", "timestamp", "amt"]]
from tecton import batch_feature_view, Attribute
from tecton.types import Float64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="spark_sql",
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Attribute("AMT", Float64),
],
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
AMT,
TIMESTAMP
FROM
{transactions_batch}
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Float64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="snowflake_sql",
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Attribute("AMT", Float64),
],
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
AMT,
TIMESTAMP
FROM
{transactions_batch}
"""
Batch Feature Views can also be used to perform row-level transformations on
multiple Data Sources joined together. In the example below, two data sources
transaction_batch and fraud_users_batch are combined together into a single
Feature View in order to combine date of birth information from
fraud_users_batch with transaction information from transaction_batch to
determine if a user was an adult at the time of a given transaction.
- Rift
- Spark
- Rift (Snowflake SQL)
from tecton import batch_feature_view, Attribute
from tecton.types import Bool, Field, String, Timestamp
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("user_is_adult", Int32),
],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
fraud_users_batch["dob"] = pd.to_datetime(fraud_users_batch["dob"])
transactions_batch["timestamp"] = pd.to_datetime(transactions_batch["timestamp"])
joined_df = pd.merge(transactions_batch, fraud_users_batch, on="user_id")
joined_df["user_is_adult"] = (joined_df["timestamp"] - joined_df["dob"]).dt.days > 18 * 365
return joined_df[["user_id", "timestamp", "user_is_adult"]]
from tecton import batch_feature_view, Attribute
from tecton.types import Bool, Field, String, Timestamp
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch],
entities=[user],
mode="spark_sql",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("user_is_adult", Int32),
],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
datediff(timestamp, to_date(dob)) > (18*365) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Bool, Field, String, Timestamp
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[transactions_batch, fraud_users_batch],
entities=[user],
mode="snowflake_sql",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Attribute("user_is_adult", Int32),
],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
datediff(year, timestamp, to_date(dob)) > (18*365) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
Batch Feature Aggregationsβ
Batch Feature Views can also define aggregations using Tecton's Aggregation Engine or plain SQL.
Time-Window aggregationsβ
Batch Feature Views can also run aggregations over multiple rows in a dataset
using using Tecton's
Aggregation Engine.
The example below takes the same transactions_batch datasource, extracts three
columns (user_id, transaction, timestamp) and generates partial aggregates
which are rolled up retrieval time to generate 3 features:
transaction_count_1d_1d, transaction_count_30d_1d, and
transaction_count_90d_1d.
The aggregation_interval parameter defines the interval by which the
aggregation time window advances (e.g., 1 day, in the example below).
- Rift
- Spark
- Rift (Snowflake SQL)
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Field, Int64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Field, Int64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
description="User transaction totals over a series of time windows, updated daily.",
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
TRANSACTION,
TIMESTAMP
FROM
{transactions_batch}
"""
from tecton import batch_feature_view, Aggregate, TimeWindow
from tecton.types import Field, Int64
from datetime import datetime, timedelta
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="snowflake_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=30)),
),
Aggregate(
input_column=Field("TRANSACTION", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=90)),
),
],
description="User transaction totals over a series of time windows, updated daily.",
)
def user_last_transaction_amount(transactions_batch):
return f"""
SELECT
USER_ID,
TRANSACTION,
TIMESTAMP
FROM
{transactions_batch}
"""
Custom ETL features (such as arbitrary SQL aggregations)β
In general, if you can express your aggregation features using Tecton's Aggregation Engine, you should always do so because of the engine's performance and efficiency, simplicity, and accuracy benefits.
However, in some cases, you may not want to use Tecton's Aggregation Engine. The most common reasons are:
- You are migrating an existing feature into Tecton and don't want to make any changes
- You need aggregation functions that Tecton's Engine doesn't support
- You want to precompute several nested layers of aggregations (e.g. calculate the standard-deviation of the max amount of the lifetime transaction number per country)
Here is a simple example:
- Rift
- Spark
- Rift (Snowflake SQL)
from tecton import batch_feature_view, Attribute
from tecton.types import Int64, Field, String, Timestamp
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=30)
recent_transactions = transactions_batch[
(transactions_batch["timestamp"] >= start_time) & (transactions_batch["timestamp"] < end_time)
]
result = (
recent_transactions.groupby("user_id")
.agg(distinct_merchant_transaction_count_30d=("merchant", "nunique"))
.reset_index()
)
result["timestamp"] = pd.to_datetime(context.end_time_inclusive)
return result
from tecton import batch_feature_view, Attribute
from tecton.types import Int64
from fraud.data_sources.transactions import transactions_batch
from fraud.entities import user
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="spark_sql",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
return f"""
SELECT
user_id,
TO_TIMESTAMP("{context.end_time_inclusive}") as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Int64
from fraud.data_sources.transactions import transactions_batch
from fraud.entities import user
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions_batch.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
)
],
entities=[user],
mode="snowflake_sql",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("distinct_merchant_transaction_count_30d", Int64)],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context):
return f"""
SELECT
user_id,
TO_TIMESTAMP('{context.end_time_inclusive}') as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
Review
this guide
for more details on how and why to use incremental_backfills=True and how to
develop custom ETL features in a performant and cost-efficient way.
Additional Feature Definition Notesβ
Full List of Parametersβ
See theΒ API reference for the full list of parameters.
Supported Modesβ
Batch Feature Views support the following Compute modes:
| Mode | Description | Supported Compute |
|---|---|---|
| "pandas" | Local testing and lightweight pipelines | Rift, Notebooks |
| "python" | Python-based transformations | Rift, Notebooks |
| "spark_sql" | Scalable Spark-based transformations | Databricks, EMR |
| "bigquery_sql" | SQL transformations in BigQuery | Rift |
| "snowflake_sql" | SQL transformations in Snowflake | Rift |
Feature Naming Requirementsβ
See Feature Naming Requirements for naming rules and examples.
What's Nextβ
See more Batch Feature View examples.
Now that you understand and can implement Batch Feature Views, here are some good next steps:
- Explore Feature Services to see how Feature Views are bundled and served.
- Set up Materialization Monitoring to keep track of data freshness and health.
- Read about Testing Batch Features to validate feature logic before deploying.
- Learn about feature design patterns.
Want to use your Batch Feature View in production? Head over to Deploying Features to Production.