Python Transformations with the Stream Ingest API
The Stream Ingest API can be used with Tecton's Serverless Python Engine to run transformations at request time, and the Aggregation Engine for defining efficient time-windowed aggregations. The same transformations will be run during offline during any backfills.
The following example shows creating a Stream Feature View that processes events from the Stream Ingest API.
from datetime import datetime, timedelta
from tecton import stream_feature_view, Attribute
from tecton.types import String, Timestamp, Int64
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source
features = [
Attribute(name="clicked", dtype=Int64),
Attribute(name="clicked_squared", dtype=Int64),
]
@stream_feature_view(
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
mode="python",
timestamp_field="timestamp",
features=features,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
- The
output_schema
refers to the schema of the records after transformation.
Transformations with Time-Windowed Aggregationsβ
Below is an example Stream Feature View with a Push Source with an Aggregation as well as transformations.
from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, Aggregate, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64
@stream_feature_view(
source=impressions_event_source,
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but wonβt write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("clicked", Int64), function="count", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("clicked", Int64), function="count", time_window=timedelta(hours=12)),
Aggregate(input_column=Field("clicked_squared", Int64), function="sum", time_window=timedelta(hours=24)),
],
description="The aggregated count of ad clicks for a user",
mode="python",
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
When using both transformations and Time-Windowed Aggregations, the
output_schema
parameter refers to the output of the records after
transformation. Tecton computes the schema of the post-aggregation records
automatically.