Skip to main content
Version: 0.9

Time-Windowed Aggregations with Stream Ingest API

The Stream Ingest API is specifically designed to integrate with Tecton's Aggregation Engine. See the Aggregations Reference for the full list of supported aggregation functions.

from datetime import timedelta, datetime
from tecton import StreamFeatureView, Aggregation
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source

user_click_counts_wafv = StreamFeatureView(
name="user_click_counts_wafv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2023, 1, 1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=24)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=72)),
],
description="The count of ad clicks for a user",
)

To learn more about the performance of aggregations with the Stream Ingest API, see the Continuous Windows section on aggregation performance.

Was this page helpful?