Skip to main content
Version: 0.9

Python Transformations with the Stream Ingest API

The Stream Ingest API can be used with Tecton's Serverless Python Engine to run transformations at request time, and the Aggregation Engine for defining efficient time-windowed aggregations. The same transformations will be run during offline during any backfills.

The following example shows creating a Stream Feature View that processes events from the Stream Ingest API.

from datetime import datetime, timedelta
from tecton import stream_feature_view
from tecton.types import Field, String, Timestamp, Int64
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source

output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]


@stream_feature_view(
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
mode="python",
schema=output_schema,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
  • The output_schema refers to the schema of the records after transformation.

Transformations with Time-Windowed Aggregations​

Below is an example Stream Feature View with a Push Source with an Aggregation as well as transformations.

from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, Aggregation, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64

output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]


@stream_feature_view(
source=impressions_event_source,
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but won’t write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=12)),
Aggregation(column="clicked_squared", function="sum", time_window=timedelta(hours=24)),
],
description="The aggregated count of ad clicks for a user",
mode="python",
schema=output_schema,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source

When using both transformations and Time-Windowed Aggregations, the output_schema parameter refers to the output of the records after transformation. Tecton computes the schema of the post-aggregation records automatically.

Was this page helpful?