Skip to main content
Version: Beta 🚧

Complex Data Types with Stream Ingest API

This example shows how to incorporate Struct and Map data types into a Tecton schema, enabling the processing of more complex and rich data structures in your streaming data. Refer to Data Types for more information on all Data Types supported by Tecton.

Stream Source with Complex Data Types​

from tecton.types import Field, Float64, String, Timestamp, Struct, Map
from tecton import StreamSource, PushConfig

# Example Struct for product details
product_details_struct = Struct(
[
Field("product_id", dtype=String),
Field("category", dtype=String),
Field("price", dtype=Float64),
]
)

# Schema with Struct and Map for Stream Source
schema = [
Field("user_id", dtype=String),
Field("timestamp", dtype=Timestamp),
Field("product_details", dtype=product_details_struct),
Field("user_preferences", dtype=Map(String, Float64)), # Map product category to a user preference score
]

# Stream Source
ecommerce_source = StreamSource(
name="ecommerce_events",
schema=schema,
stream_config=PushConfig(log_offline=True),
description="Stream of e-commerce events, including user actions and product details.",
)

Stream Feature View using a Struct Type​

Below is an example Stream Feature View that leverages the ecommerce_events Stream Source to calculate the mean price of products in a user's cart.

from datetime import timedelta, datetime
from tecton import stream_feature_view, Aggregate, Entity
from tecton.types import Field, Float64, String, Timestamp

user = Entity(
name="user",
join_keys=[Field("user_id", String)],
description="A user of the platform",
)


@stream_feature_view(
source=ecommerce_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2024, 1, 1),
mode="python",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("price", dtype=Float64), function="mean", time_window=timedelta(days=1)),
Aggregate(input_column=Field("price", dtype=Float64), function="mean", time_window=timedelta(days=24)),
Aggregate(input_column=Field("price", dtype=Float64), function="mean", time_window=timedelta(days=72)),
],
description="The average price of products in a user cart",
)
def average_product_price(event):
return {
"user_id": event["user_id"],
"timestamp": event["timestamp"],
"price": float(event["product_details"]["price"]),
}

Stream Feature View using a Map type​

Below is another Stream Feature View that represents the user's preference score for a given category, say "electronics" using the Map field in the Stream Source.

from datetime import timedelta, datetime
from tecton import stream_feature_view, Aggregate, Entity
from tecton.types import Field, Float64, String, Timestamp

user = Entity(
name="user",
join_keys=[Field("user_id", String)],
description="A user of the platform",
)


@stream_feature_view(
source=ecommerce_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2024, 1, 1),
mode="python",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[Attribute("preference_score", dtype=Float64)],
description="User preference score for the electronics category",
)
def electronics_preference_feature(event):
return {
"user_id": event["user_id"],
"timestamp": event["timestamp"],
"preference_score": event["user_preferences"].get("electronics", 0),
}

Sample Event to Stream Ingest API​

You can now send events for the ecommerce_events source to Stream Ingest API. Below is an example curl query:

curl -X POST https://preview.<your_cluster>.tecton.ai/ingest \
-H "Authorization: Tecton-key $TECTON_API_KEY" -d \
'{
"workspace_name": "ingestapi-test",
"dry_run": false,
"records": {
"ecommerce_events": [
{
"record": {
"timestamp": "2024-01-02T00:25:06Z",
"user_id": "C18947Z5",
"product_details": {
"product_id": "P1",
"category": "food",
"price": 50.0
},
"user_preferences": {
"electronics": 1.5,
"food": 2.0
}
}
}
]
}
}'

Was this page helpful?