Custom ETL Features with Incremental Backfills
If you need to define aggregation features that aren't supported by Tecton's Aggregation Engine, you can develop custom ETL features.
When using custom ETL features, disable Tecton’s optimized backfill by setting
incremental_backfills
to True
. Here's why you need to disable it, and how it
works:
- Default behavior: Tecton runs your query once for the entire time between
feature_start_time
and the feature registration time, minimizing backfill jobs (as explained here). This isn't compatible with ETL features that expect to process a fixed time range (say 1 day worth of data) of data for every single query run. - Incremental Backfills: Your query runs separately for each
batch_schedule
interval in the specified time range, ensuring every query run gets to process a fixed time range of data.
To illustrate this with an example, let's say you register a feature view on May
1, 2023. You set the feature_start_time
as January 1, 2023, indicating that
you want to backfill the feature view all the way back to the beginning of the
year. Additionally, you define the feature's batch_schedule
as "1d,"
instructing Tecton to run materialization jobs on a daily basis. Consequently,
Tecton will perform a backfill query for your feature view each day for the time
period between January 1, 2023, and May 1, 2023 (excluding the end date). This
timeframe covers 120 days, resulting in a total of 120 backfill queries being
executed.
To determine the number of materialization jobs that will be created when you
apply your changes, we strongly recommend checking the output of the
tecton plan
command. This will provide you with valuable insights into the job
count and help you ensure that everything is configured as expected, and that
the costs of the operation are acceptable.
Because of the need for many jobs, Batch Feature Views with incremental backfills cannot compute features on the fly from the source and, instead, must be materialized first prior to requesting training data.
Example​
Below are two examples of how to implement a Batch Feature View for a count
distinct merchant over a seven day period. The first example uses a custom
DataSource filter to accomplish this. The second example uses the context
parameter which includes the materialization context start_time
and
end_time
.
In both examples, the timestamps of the features are set to the end of the
materialization window:
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND
. This is done
since Tecton will filter the output of the feature view transformation by
[context.start_time, context.end_time)
.
Custom Data Source Filter
In this example, we use a custom DataSource filter with a -6 days start_time to
achieve our seven day aggregate. The materialization window is one day long,
since the batch_schedule
is one day. Offsetting the start time by -6 days
means that the total time window of data returned by the data source for a
single materialization query is 7 days long.
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute, TectonTimeConstant
from tecton.types import Field, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=6),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
),
],
entities=[user],
mode="pandas",
online=True,
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[Attribute("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=7)
recent_transactions = transactions[
(transactions["TIMESTAMP"] >= start_time) & (transactions["TIMESTAMP"] < end_time)
]
result = (
recent_transactions.groupby("USER_ID").agg(DISTINCT_MERCHANT_COUNT_7D=("MERCHANT", "nunique")).reset_index()
)
result["TIMESTAMP"] = end_time - pd.Timedelta(microseconds=1)
return result
from tecton import batch_feature_view, Attribute, TectonTimeConstant
from tecton.types import Field, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=6),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
),
],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[Attribute("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D
FROM
{transactions}
GROUP by USER_ID
"""
from tecton import batch_feature_view, Attribute, TectonTimeConstant
from tecton.types import Field, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[
transactions.select_range(
start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=6),
end_time=TectonTimeConstant.MATERIALIZATION_END_TIME,
),
],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[Attribute("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP('{context.end_time}') - INTERVAL '1 MICROSECOND' as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D
FROM
{transactions}
GROUP by USER_ID
"""
Materialization Context
In this example, we use the materialization context (i.e. the context
parameter in our feature view function) to filter for the right time window of
raw data needed for each materialization query. That data can then be aggregated
to produce the feature values for that time period.
- Rift
- Spark
- Snowflake
from tecton import batch_feature_view, Attribute
from tecton.types import Field, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="pandas",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[Attribute("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context):
start_time = pd.to_datetime(context.start_time) - pd.Timedelta(days=6)
end_time = pd.to_datetime(context.end_time)
filtered_transactions = transactions[
(transactions["TIMESTAMP"] >= start_time) & (transactions["TIMESTAMP"] < end_time)
]
result = (
filtered_transactions.groupby("USER_ID").agg(DISTINCT_MERCHANT_COUNT_7D=("MERCHANT", "nunique")).reset_index()
)
result["TIMESTAMP"] = end_time - pd.Timedelta(microseconds=1)
return result
from tecton import batch_feature_view, Attribute
from tecton.types import Field, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[Attribute("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
GROUP by USER_ID
"""
from tecton import batch_feature_view, Attribute
from tecton.types import Field, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="snowflake_sql",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
timestamp_field="TIMESTAMP",
features=[Attribute("DISTINCT_MERCHANT_COUNT_7D", Int64)],
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP('{context.end_time}') - INTERVAL '1 MICROSECOND' as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP('{context.start_time}') - INTERVAL '6 DAYS'
AND TIMESTAMP < TO_TIMESTAMP('{context.end_time}')
GROUP by USER_ID
"""
Performance Considerations​
While incremental backfills will simplify your transformation logic, it may also result in many more backfill queries which can become inefficient at scale.
Leveraging time filters on your data source is especially important to ensure
that each query operates only over the data that it needs. Although you can
accomplish this with your own filter logic within the feature view using the
context
, Tecton recommends that you leverage custom Data Source filters. Using
custom filters ensures that data sources are filtered correctly and any
configured partitions are used for improved filter performance.