Skip to main content
Version: 0.8

Batch Feature View

A Batch Feature View defines transformations against one or many Batch Data Sources (e.g. Snowflake, Redshift, BigQuery, S3 etc.) and computes features on a schedule. They can also run automatic backfills when they’re first created.

Feature Examples:

  • determining if a user's credit score is over a pre-defined threshold
  • counting the total number of transactions over a time window in batch
  • batch ingesting pre-computed feature values from an existing batch source

Batch Feature Transformations

Batch Feature Views can define row-level transformations for filtering and projection. They can also define aggregations using Tecton's Aggregation Engine or plain SQL.

We will look at them one by one.

Row-Level Transformations

Batch Feature Views can define row-level filtering or projection transformations using Pandas, Snowflake SQL, or Spark

Here is a very simple row-level transformation example against a single data source.

from tecton import batch_feature_view
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[transactions],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
def user_last_transaction_amount(transactions):
return transactions[["user_id", "timestamp", "amt"]]

Of course, you can always join multiple batch data sources together:

from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[FilteredSource(transactions_batch), fraud_users_batch],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("user_is_adult", Int64)],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
fraud_users_batch["dob"] = pd.to_datetime(fraud_users_batch["dob"])

transactions_batch["timestamp"] = pd.to_datetime(transactions_batch["timestamp"])

joined_df = pd.merge(transactions_batch, fraud_users_batch, on="user_id")

joined_df["user_is_adult"] = ((joined_df["timestamp"] - joined_df["dob"]).dt.days > 18 * 365).astype(int)

return joined_df[["user_id", "timestamp", "user_is_adult"]]

Time-Window aggregations

Batch Feature Views can also run aggregation transformations using Tecton's Aggregation Engine:

from tecton import batch_feature_view, FilteredSource, Aggregation, TimeWindow
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("transaction", Int64)],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]

The aggregation_interval parameter defines the interval by which the window advances (e.g., 1 day, in the example above). Tecton updates feature values whenever the aggregation interval elapses (e.g., daily, in the example above).

Custom ETL features (such as arbitrary SQL aggregations)

In general, if you can express your aggregation features using Tecton's Aggregation Engine, you should always do so because of the engine's performance & efficiency, simplicity and accuracy benefits.

However, in some cases, you may not want to use Tecton's Aggregation Engine. The most common reasons are:

  • You are migrating an existing feature into Tecton and don't want to make any changes
  • You need aggregation functions that Tecton's Engine doesn't support
  • You want to precompute several nested layers of aggregations (e.g. calculate the standard-deviation of the max amount of the lifetime transaction number per country)

Here is a simple example:

from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("distinct_merchant_transaction_count_30d", Int64),
],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=30)
recent_transactions = transactions_batch[
(transactions_batch["timestamp"] >= start_time) & (transactions_batch["timestamp"] < end_time)
]
result = (
recent_transactions.groupby("user_id")
.agg(distinct_merchant_transaction_count_30d=("merchant", "nunique"))
.reset_index()
)
result["timestamp"] = end_time - pd.Timedelta(microseconds=1)
return result
info

Please review this guide for more details on how to develop custom ETL features in a performant and cost-efficient way.

Materialization to the Online and Offline Feature Store

Online Materialization

You can easily make batch features available for low-latency online retrieval to feed an online model. Simply set online=True. If you want Tecton to backfill the feature, please also define the feature_start_time.

Offline Materialization

Tecton also supports offline materialization. This can speed up some expensive queries considerably. When you set offline=True, Tecton will materialize offline feature data to an offline table according to the Feature View's configured schedule and feature_start_time.

If you don’t materialize Batch Feature Views offline, Tecton will execute your transformation directly against the upstream raw data sources when you use the Tecton SDK to generate offline data. Speaking in SQL terms, a Batch Feature View without offline materialization is simply a “View”. A Batch Feature View with offline materialization is a “Materialized View”.

Offline materialization has additional benefits including:

  • Online-offline skew is minimized because the data in the online and offline store is updated at the same time.
  • Offline feature data is saved so it is resilient to any losses of historical data upstream and training datasets can always be regenerated.

Materialization Job Scheduling Behavior

During the initial backfill of your feature, Tecton tries to minimize the number of backfill jobs in order to drastically reduce the backfill costs. For example, if you define a feature with a batch_schedule of 1 day that needs to backfill 1 year worth of data, you will find that Tecton schedules just ~10 distinct backfill jobs, rather than 365, as you may typically expect. You can modify Tecton's backfill job splitting behavior by setting the max_backfill_interval parameter.

info

If you're used to common Data Engineering tools like Airflow, you may expect Tecton to schedule one backfill job for every batch_schedule interval in your backfill period. For instance, you may expect a feature that has a daily batch_schedule, and that needs to backfilled for the past 1 year, to kick off 365 distinct backfill jobs. This can be very expensive. It is also common practice and the default behavior of most off-the-shelf ETL solutions (like Airflow). You can force Tecton to use this naive backfill mode by setting incremental_backfill to True. Please visit this guide that discusses a valid use case for this mode.

For every steady-state, forward fill, of your feature, Tecton will schedule exactly one materialization job.

Feature Data Timestamp Expectations

Every materialization run is expected to produce feature values for a specific time range. This time range is known as the “materialization window”. The materialization window is different for backfills and incremental runs:

  • During the initial backfill of feature data to the Online and Offline Feature Store, the materialization time window starts with feature_start_time and ends with Tecton’s “current wall clock time” at which the feature’s materialization is enabled.
  • On incremental runs, the materialization time window starts with the previous run’s start_time and ends with start_time + batch_schedule.

Backfill and Incremental Materialization

Tecton only materializes feature values that fall within the materialization time window. It automatically filters out unexpected feature values as shown with the WHERE clause below:

--Tecton applies this filter to the user-provided transformation

SELECT * FROM {batch_feature_view_transformation}
WHERE {timestamp_field} >= {start_time}
AND {timestamp_field} < {end_time}
info

The start time of the window is inclusive and the end time is exclusive. This means that a feature value whose timestamp is exactly equal to the end_time is not part of the window.

Efficient Incremental Materialization

In many cases, incremental materialization runs do not need to process all of the input source's raw data.

For example, an incremental materialization run of a row-level transformation that processes raw data at midnight every day should only look at the event data of the past 24 hours, and not the entire event history.

Automatic filtering using FilteredSource

For convenience, Tecton offers a FilteredSource class that automatically pushes timestamp and partition filtering to the data source.

As a result, your Feature View transformation does not need to manually filter out raw data that's not required for the current materialization window.

Behind the scenes, Tecton will automatically filter the data source’s data based on its timestamp_field and, if applicable, its datetime_partition_columns.

Here is an example that shows how to use the FilteredSource in practice.

from tecton import batch_feature_view, FilteredSource
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode="pandas",
online=True,
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions):
return transactions[["user_id", "timestamp", "amt"]]

By default, FilteredSource filters for data between context.start_time and context.end_time.

Manual filtering

If FilteredSource isn't an option for you, you can manually filter for the raw data needed to produce feature values on each run by leveraging a context object that Tecton passes into the transformation function. context.start_time and context.end_time are equal to the expected materialization time window as shown in the diagram below:

Materialization Context Window

The example transformation below filters for the required raw data in the WHERE clause.

from tecton import batch_feature_view, materialization_context
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode="pandas",
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions, context=materialization_context()):
df = transactions[["user_id", "amt", "timestamp"]]
return df[(df["timestamp"] >= context.start_time) & (df["timestamp"] < context.end_time)]
info

In cases where you read from a time-partitioned data source, like a Glue table or partitioned data on S3, you typically will also want to filter by partition columns.

Late Arriving Data

By default, incremental materialization jobs for Batch Feature Views run immediately at the end of the batch schedule period. To override this default, set the data_delay parameter, which is specified in the data source configuration (the batch_config object referenced in the BatchSource object used by the Batch Feature View). data_delay configures how long jobs wait after the end of the batch schedule period before starting. This is typically used to ensure that all data has landed. For example, if a Batch Feature View has a batch_schedule of 1 day and its data source input has data_delay=timedelta(hours=1) set, then incremental batch materialization jobs will run at 01:00 UTC.

If your upstream data delay is unpredictable, you can trigger materialization with an API call. Please follow these instructions

Full list of parameters

See the API reference for the full list of parameters.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon