Skip to main content
Version: 0.8

Batch Feature View

A Batch Feature View defines transformations against one or many Batch Data Sources (e.g. Snowflake, Redshift, BigQuery, S3 etc.) and computes features on a schedule. They can also run automatic backfills when they’re first created.

Feature Examples:

  • determining if a user's credit score is over a pre-defined threshold
  • counting the total number of transactions over a time window in batch
  • batch ingesting pre-computed feature values from an existing batch source

Batch Feature Transformations​

Batch Feature Views can define row-level transformations for filtering and projection. They can also define aggregations using Tecton's Aggregation Engine or plain SQL.

We will look at them one by one.

Row-Level Transformations​

Batch Feature Views can define row-level filtering or projection transformations using Pandas, Snowflake SQL, or Spark

Here is a very simple row-level transformation example against a single data source.

from tecton import batch_feature_view
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[transactions],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
def user_last_transaction_amount(transactions):
return transactions[["user_id", "timestamp", "amt"]]

Of course, you can always join multiple batch data sources together:

from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[FilteredSource(transactions_batch), fraud_users_batch],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("user_is_adult", Int64)],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
fraud_users_batch["dob"] = pd.to_datetime(fraud_users_batch["dob"])

transactions_batch["timestamp"] = pd.to_datetime(transactions_batch["timestamp"])

joined_df = pd.merge(transactions_batch, fraud_users_batch, on="user_id")

joined_df["user_is_adult"] = ((joined_df["timestamp"] - joined_df["dob"]).dt.days > 18 * 365).astype(int)

return joined_df[["user_id", "timestamp", "user_is_adult"]]

Time-Window aggregations​

Batch Feature Views can also run aggregation transformations using Tecton's Aggregation Engine:

from tecton import batch_feature_view, FilteredSource, Aggregation, TimeWindow
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("transaction", Int64)],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]

The aggregation_interval parameter defines the interval by which the window advances (e.g., 1 day, in the example above). Tecton updates feature values whenever the aggregation interval elapses (e.g., daily, in the example above).

Custom ETL features (such as arbitrary SQL aggregations)​

In general, if you can express your aggregation features using Tecton's Aggregation Engine, you should always do so because of the engine's performance & efficiency, simplicity and accuracy benefits.

However, in some cases, you may not want to use Tecton's Aggregation Engine. The most common reasons are:

  • You are migrating an existing feature into Tecton and don't want to make any changes
  • You need aggregation functions that Tecton's Engine doesn't support
  • You want to precompute several nested layers of aggregations (e.g. calculate the standard-deviation of the max amount of the lifetime transaction number per country)

Here is a simple example:

from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("distinct_merchant_transaction_count_30d", Int64),
],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=30)
recent_transactions = transactions_batch[
(transactions_batch["timestamp"] >= start_time) & (transactions_batch["timestamp"] < end_time)
]
result = (
recent_transactions.groupby("user_id")
.agg(distinct_merchant_transaction_count_30d=("merchant", "nunique"))
.reset_index()
)
result["timestamp"] = end_time - pd.Timedelta(microseconds=1)
return result
info

Please review this guide for more details on how to develop custom ETL features in a performant and cost-efficient way.

Full list of parameters​

See the API reference for the full list of parameters.

Was this page helpful?