Skip to main content
Version: Beta 🚧

Aggregation Windows

When using Tecton's Aggregation Engine you can choose between different types of aggregation windows.

Sliding Window

The most common aggregation features aggregate over a fixed-length time window.

Example:

from datetime import datetime, timedelta
from tecton import StreamFeatureView, Entity, Aggregation, StreamSource, PushConfig
from tecton.types import String, Timestamp, Float64, Field


# Define the entity for this feature
user = Entity(name="User", join_keys=["user_id"])

# Define a Streaming Push Source
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="transaction_amount", dtype=Float64),
]
transactions_source = StreamSource(
name="transactions_source",
stream_config=PushConfig(log_offline=True),
schema=input_schema,
)

# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2018, 1, 1),
aggregations=[
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=30)),
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="count", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="last", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="max", time_window=timedelta(days=365 * 4)),
Aggregation(column="transaction_amount", function="min", time_window=timedelta(days=365 * 4)),
],
)

The Stream Feature View example above aggregates over multiple different time windows. Each one of those time windows is of a fixed length.

The time windows slide forward continuously and provide you with the freshest possible streaming time window aggregation features in Tecton.

Sliding Windows that move by a fixed interval

If you don't want your time windows to slide forward continuously, you have the option to determine a fixed interval by which time windows move forward.

You can specify the interval using the aggregation_interval parameter

Example:

from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""

The time windows of this StreamFeatureView move forward with a fixed interval of 10 minutes, as specified by aggregation_interval.

note

Time windows that move forward by a fixed interval are only supported with Spark-based Streaming FeatureViews today

Sliding Windows that lag by a fixed interval

A common use case in feature engineering is to calculate lagging time windows. Effectively, you ask Tecton to anchor the end of your time window not to the feature query time, but to a relative point of time in the past.

Example:

from tecton import TimeWindow


@stream_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=5),
aggregations=[
Aggregation(
function="sum",
column="amt",
time_window=TimeWindow(window_size=timedelta(days=7), offset=timedelta(days=-1)),
)
],
)
def user_transaction_amount_averages(transactions):
return f"""
SELECT user_id, timestamp, amt
FROM {transactions}
"""

The example above produces a 7-day trailing sum aggregation that lags behind by 1 day, as specified by the offset parameter. If you fetch the feature value at 1pm today, Tecton will return a value that aggregates the data between 1pm yesterday and 1pm 8 days ago.

Lifetime Windows

Private Preview

This feature is currently in Private Preview.

This feature has the following limitations:
  • Must be enabled by Tecton Support.
If you would like to participate in the preview, please file a feature request.

Lifetime windows continuously expand with new data.

The start time of a lifetime window is specified by the lifetime_start_time parameter. Note that lifetime windows require Data Compaction to be enabled for the Feature View.

See the SDK Reference for more details.

Example:

from tecton import LifetimeWindow, TimeWindow

# Lifetime window aggregation feature
transaction_aggregations = BatchFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
batch_compaction_enabled=True,
feature_start_time=datetime(2023, 5, 30),
lifetime_start_time=datetime(2023, 5, 20),
aggregations=[
Aggregation(column="transaction_amount", function="sum", time_window=LifetimeWindow()),
Aggregation(column="transaction_amount", function="sum", time_window=TimeWindow(time_window=timedelta(days=5))),
Aggregation(
column="transaction_amount", function="sum", time_window=TimeWindow(time_window=timedelta(days=10))
),
],
)

Note that lifetime_start_time should be earlier than or equal to the materialization start time of the largest non-lifetime time window.

In the example above:

  • The feature_start_time is 2023-05-30 and the largest TimeWindow is 10 days. Tecton will start materializing data for this interval on 2023-05-20.
  • lifetime_start_time should therefore be 2023-05-20 or earlier.

Sliding Window Series (Coming soon)

Sliding Window Series compute a series of time windows for each step size in a given fixed time range:

Sliding Time Window Series

Example:

from tecton import SlidingWindowSeries


@stream_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=5),
aggregations=[
Aggregation(
function="sum",
column="amt",
time_window=SlidingWindowSeries(
start=timedelta(days=-7),
stop=timedelta(days=-3),
step_size=timedelta(days=1),
window_size=timedelta(days=2),
),
)
],
)
def user_transaction_amount_averages(transactions):
return f"""
SELECT user_id, timestamp, amt
FROM {transactions}
"""

The feature defined above produces several hopping time window aggregations for any given point of time. For instance, at feature request time, the example above will produce an array of the following time window aggregations:

  • Start: Now - 5 days. End: Now - 3 days
  • Start: Now - 6 days. End: Now - 4 days
  • Start: Now - 7 days. End: Now - 5 days
  • Start: Now - 8 days. End: Now - 6 days

You configure the produced series with the following parameters:

  • window_size: What is the fixed length of the time window to aggregate the data
  • step_size: This is the hopping interval
  • start: This defines the start of the time window series
  • end: This defines the end of the time window series

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon