Skip to main content
Version: 0.9

Custom ETL Features with Incremental Backfills

If you need to define aggregation features that aren't supported by Tecton's Aggregation Engine, you can develop custom ETL features.

When using custom ETL features, disable Tecton’s optimized backfill by setting incremental_backfills to True. Here's why you need to disable it, and how it works:

  • Default behavior: Tecton runs your query once for the entire time between feature_start_time and the feature registration time, minimizing backfill jobs (as explained here). This isn't compatible with ETL features that expect to process a fixed time range (say 1 day worth of data) of data for every single query run.
  • Incremental Backfills: Your query runs separately for each batch_schedule interval in the specified time range, ensuring every query run gets to process a fixed time range of data.

To illustrate this with an example, let's say you register a feature view on May 1, 2023. You set the feature_start_time as January 1, 2023, indicating that you want to backfill the feature view all the way back to the beginning of the year. Additionally, you define the feature's batch_schedule as "1d," instructing Tecton to run materialization jobs on a daily basis. Consequently, Tecton will perform a backfill query for your feature view each day for the time period between January 1, 2023, and May 1, 2023 (excluding the end date). This timeframe covers 120 days, resulting in a total of 120 backfill queries being executed.

Materialization Incremental Backfills

note

To determine the number of materialization jobs that will be created when you apply your changes, we strongly recommend checking the output of the tecton plan command. This will provide you with valuable insights into the job count and help you ensure that everything is configured as expected, and that the costs of the operation are acceptable.

caution

Because of the need for many jobs, Batch Feature Views with incremental backfills cannot compute features on the fly from the source and, instead, must be materialized first prior to requesting training data.

Example​

Below are two examples of how to implement a Batch Feature View for a count distinct merchant over a seven day period. The first example uses FilteredSource to accomplish this, the second example uses the materialization_context.

note

In both examples, the timestamps of the features are set to the end of the materialization window: TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND. This is done since Tecton will filter the output of the feature view transformation by [context.start_time, context.end_time).

Filtered Source

In this example, we use a FilteredSource with a -6 days start_time_offset to achieve our seven day aggregate. The materialization window is one day long, since the batch_schedule is one day. Offsetting the start time by -6 days means that the total time window of data returned by the data source for a single materialization query is 7 days long.

from tecton import batch_feature_view, FilteredSource
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode="pandas",
online=True,
batch_schedule=timedelta(days=1),
schema=[Field("USER_ID", String), Field("TIMESTAMP", Timestamp), Field("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=7)
recent_transactions = transactions[
(transactions["TIMESTAMP"] >= start_time) & (transactions["TIMESTAMP"] < end_time)
]
result = (
recent_transactions.groupby("USER_ID").agg(DISTINCT_MERCHANT_COUNT_7D=("MERCHANT", "nunique")).reset_index()
)
result["TIMESTAMP"] = end_time - pd.Timedelta(microseconds=1)
return result

Materialization Context

In this example, we use the materialization context (i.e. the context parameter in our feature view function) to filter for the right time window of raw data needed for each materialization query. That data can then be aggregated to produce the feature values for that time period.

from tecton import batch_feature_view, materialization_context
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta


@batch_feature_view(
sources=[transactions],
entities=[user],
mode="pandas",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
schema=[Field("USER_ID", String), Field("TIMESTAMP", Timestamp), Field("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
start_time = pd.to_datetime(context.start_time) - pd.Timedelta(days=6)
end_time = pd.to_datetime(context.end_time)
filtered_transactions = transactions[
(transactions["TIMESTAMP"] >= start_time) & (transactions["TIMESTAMP"] < end_time)
]
result = (
filtered_transactions.groupby("USER_ID").agg(DISTINCT_MERCHANT_COUNT_7D=("MERCHANT", "nunique")).reset_index()
)
result["TIMESTAMP"] = end_time - pd.Timedelta(microseconds=1)
return result

Performance Considerations​

While incremental backfills will simplify your transformation logic, it may also result in many more backfill queries which can become inefficient at scale.

Leveraging time filters on your data source is especially important to ensure that each query operates only over the data that it needs. Although you can accomplish this with your own filter logic within the feature view using the materialization_context, Tecton recommends that you leverage FilteredSource. Using a FilteredSource ensures that data sources are filtered correctly and any configured partitions are used for improved filter performance.

Was this page helpful?