Skip to main content
Version: 0.7

Aggregation Windows

With Tecton 0.7, Tecton's Aggregation Engine supports sliding time windows, which aggregate over a fixed-length time window.

note

Later versions of Tecton beyond 0.7 support more aggregation windows. Please see the corresponding documentation.

Example​

from datetime import datetime, timedelta
from tecton import StreamFeatureView, Entity, Aggregation, PushSource
from tecton.types import String, Timestamp, Float64, Field


# Define the entity for this feature
user = Entity(name="User", join_keys=["user_id"])

# Define a Streaming Push Source
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="transaction_amount", dtype=Float64),
]
transactions_source = PushSource(
name="transactions_source",
schema=input_schema,
)

# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2018, 1, 1),
aggregations=[
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=30)),
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="count", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="last", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="max", time_window=timedelta(days=365 * 4)),
Aggregation(column="transaction_amount", function="min", time_window=timedelta(days=365 * 4)),
],
)

The Stream Feature View example above aggregates over multiple different time windows. Each one of those time windows is of a fixed length.

The time windows slide forward continuously and provide you with the freshest possible streaming time window aggregation features in Tecton.

Sliding Windows that move by a fixed interval​

If you don't want your time windows to slide forward continuously, you have the option to determine a fixed interval by which time windows move forward.

You can specify the interval using the aggregation_interval parameter

Example:

from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta


# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""

The time windows of this StreamFeatureView move forward with a fixed interval of 10 minutes, as specified by aggregation_interval.

note

Time windows that move forward by a fixed interval are only supported with Spark-based Streaming FeatureViews today

Was this page helpful?