Simplifying Custom Batch Aggregations with Incremental Backfills
While incremental backfills will simplify your transformation logic, it may also
result in many more backfill jobs which can become inefficient at scale. We
recommend checking the output of your tecton plan
to see how many
materialization jobs will be created when applied.
Because of the need for many jobs, Batch Feature Views with incremental backfills must materialize feature data to the Offline Feature Store in order to be fetched offline.
When Tecton’s built-in aggregations are not an option, defining your own
aggregation features can be simplified by configuring backfills to execute
incrementally for each batch_schedule
using incremental_backfills=True
.
In this case, when Tecton backfills your feature it will execute your query for
every batch_schedule
interval between the specified feature_start_time
and
the feature registration time, as opposed to executing the provided query once
against the entire set of raw data. This allows each job to be responsible for
producing a single time-window aggregation.
The context
object can be used to filter for the right time window of raw data
needed for each transformation run. That data can then be aggregated to produce
the feature values for that time period.
- Spark
- Snowflake
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
GROUP by USER_ID
"""
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="snowflake_sql",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
GROUP by USER_ID
"""
The timestamps of the features are also set to the end of the aggregation
window: TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND
(context.end_time
is not inclusive).
For this example, you could also use a FilteredSource
with a
start_time_offset
to replace the WHERE
clause. This is often helpful in
cases with complicated partition filtering.
- Spark
- Snowflake
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
GROUP by USER_ID
"""
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
GROUP by USER_ID
"""