Skip to main content
Version: 1.0

Stream Data Sources with Stream Ingest API

This guide shows you how to create a StreamSource that processes records pushed to Tecton's Stream Ingest API endpoint.

See Stream Data Sources with Kafka or Kinesis if you prefer Tecton to connect directly to your existing streaming infrastructure and pull records from it.

Defining a Stream Source with a PushConfig​

A StreamSource is used by a StreamFeatureView to compute feature values from a continuous streaming data source. It supports the near real-time calculation of features with a freshness of less than 1s.

To enable point-in-time correct training data generation, and backfills of newly created features using historical event data, a StreamSource needs to configure one of two possible offline event logs:

  • Self-managed offline event log: If you already maintain an offline log of historical events, you can simply point Tecton at that data source.
  • Tecton-managed offline event log: Alternatively, you can instruct the StreamSource to log all ingested events to the offline store.

You can set the following fields on a StreamSource:

  • stream_config: This configures your StreamSource's PushConfig and allows you to specify if you want Tecton to manage your offline event log.
  • batch_config: If you maintain an external offline event log, you use this parameter to point Tecton at it.
  • schema: This is the schema of the stream source, i.e. the schema of the events that will be sent to the Stream Ingest API. Tecton will use this schema to validate all ingested JSON records. Data in an existing offline event log must match the schema.

Example of a Stream Source with a self-managed offline event log​

The following example declares a StreamSource that can be used to ingest transaction events. As you can see, the historical event data can be found in a parquet file on S3.

from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64

transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=False), # You don't want a Tecton-managed offline event log
batch_config=FileConfig( # This is the location of your self-managed offline event log
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)

Please see Tecton's Building Streaming Features tutorial for an end to end example.

Example of a Stream Source with a Tecton-managed offline event log​

The following example declares a StreamSource that can be used to ingest transaction events. As you can see, no batch_config is specified. Tecton manages the offline event log for you.

from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64

transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True), # You do want a Tecton-managed offline event log
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)

Example of a Stream Source with a Post Processor​

This example shows how to post-process data for a StreamSource using Python functions. Post-processing allows you to transform or enrich incoming data before it is stored or used further in your Feature View transformations. The following fields must be set in the PushConfig when using a post processor.

  • input_schema: The input schema for the post processor, which can be different from the StreamSource's schema
  • post_processor: A user-defined function that processes records coming from the source; the output of the post_processor is used in subsequent stages.
  • post_processor_mode: The post processor execution mode, must be one of python or pandas
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64, Int64

schema = [Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)]

# Input schema for the post processor
input_schema = [Field("user_uuid", Int64), Field("timestamp", Timestamp), Field("amt", Float64)]


def my_post_processor(row):
if row["amt"] is None:
# Add default amt of 0 for users with missing amount
row["amt"] = 0.0
else:
# Cast amt to Float64
row["amt"] = float(row["amt"])
row["user_id"] = str(row["user_uuid"])
return row


stream_config = PushConfig(
post_processor=my_post_processor,
post_processor_mode="python",
input_schema=input_schema,
)
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True),
schema=schema,
)

Was this page helpful?