Skip to main content
Version: 0.8

Aggregation Windows

When using Tecton's Aggregation Engine you can choose between different types of aggregation windows.

Sliding Window​

The most common aggregation features aggregate over a fixed-length time window.

Example:

from datetime import datetime, timedelta
from tecton import StreamFeatureView, Entity, Aggregation, StreamSource, PushConfig
from tecton.types import String, Timestamp, Float64, Field


# Define the entity for this feature
user = Entity(name="User", join_keys=["user_id"])

# Define a Streaming Push Source
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="transaction_amount", dtype=Float64),
]
transactions_source = StreamSource(
name="transactions_source",
stream_config=PushConfig(log_offline=True),
schema=input_schema,
)

# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2018, 1, 1),
aggregations=[
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=30)),
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="count", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="last", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="max", time_window=timedelta(days=365 * 4)),
Aggregation(column="transaction_amount", function="min", time_window=timedelta(days=365 * 4)),
],
)

The Stream Feature View example above aggregates over multiple different time windows. Each one of those time windows is of a fixed length.

The time windows slide forward continuously and provide you with the freshest possible streaming time window aggregation features in Tecton.

Sliding Windows that move by a fixed interval​

If you don't want your time windows to slide forward continuously, you have the option to determine a fixed interval by which time windows move forward.

You can specify the interval using the aggregation_interval parameter

Example:

from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""

The time windows of this StreamFeatureView move forward with a fixed interval of 10 minutes, as specified by aggregation_interval.

note

Time windows that move forward by a fixed interval are only supported with Spark-based Streaming FeatureViews today

Sliding Windows that lag by a fixed interval​

A common use case in feature engineering is to calculate lagging time windows. Effectively, you ask Tecton to anchor the end of your time window not to the feature query time, but to a relative point of time in the past.

Example:

from tecton import TimeWindow


@stream_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=5),
aggregations=[
Aggregation(
function="sum",
column="amt",
time_window=TimeWindow(window_size=timedelta(days=7), offset=timedelta(days=-1)),
)
],
)
def user_transaction_amount_averages(transactions):
return f"""
SELECT user_id, timestamp, amt
FROM {transactions}
"""

The example above produces a 7-day trailing sum aggregation that lags behind by 1 day, as specified by the offset parameter. If you fetch the feature value at 1pm today, Tecton will return a value that aggregates the data between 1pm yesterday and 1pm 8 days ago.

Lifetime Window (Coming soon)​

Lifetime windows continuously expand and are not of a fixed length.

The lifetime start time is specified by the feature_start_time parameter.

Example:

from tecton import LifetimeWindow

# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
aggregations=[
Aggregation(
column="transaction_amount", function="sum", time_window=LifetimeWindow(start_time=datetime(2018, 1, 1))
)
],
)

Sliding Window Series (Coming soon)​

Sliding Window Series compute a series of time windows for each step size in a given fixed time range:

Sliding Time Window Series

Example:

from tecton import SlidingWindowSeries


@stream_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=5),
aggregations=[
Aggregation(
function="sum",
column="amt",
time_window=SlidingWindowSeries(
start=timedelta(days=-7),
stop=timedelta(days=-3),
step_size=timedelta(days=1),
window_size=timedelta(days=2),
),
)
],
)
def user_transaction_amount_averages(transactions):
return f"""
SELECT user_id, timestamp, amt
FROM {transactions}
"""

The feature defined above produces several hopping time window aggregations for any given point of time. For instance, at feature request time, the example above will produce an array of the following time window aggregations:

  • Start: Now - 5 days. End: Now - 3 days
  • Start: Now - 6 days. End: Now - 4 days
  • Start: Now - 7 days. End: Now - 5 days
  • Start: Now - 8 days. End: Now - 6 days

You configure the produced series with the following parameters:

  • window_size: What is the fixed length of the time window to aggregate the data
  • step_size: This is the hopping interval
  • start: This defines the start of the time window series
  • end: This defines the end of the time window series

Was this page helpful?