Skip to main content
Version: 0.8

Stream Data Sources with Stream Ingest API

This guide shows you how to create a StreamSource that processes records that processes records pushed to Tecton's Stream Ingest API endpoint. See Stream Data Sources with Kafka or Kinesis if you prefer Tecton to connect directly to your existing streaming infrastructure and pull records from it.

Defining a Stream Source with a PushConfig

A StreamSource is used by a StreamFeatureView to compute feature values from a continuous streaming data source. It supports the near real-time calculation of features with a freshness of less than 1s.

To enable point-in-time correct training data generation, and backfills of newly created features using historical event data, a StreamSource needs to configure one of two possible offline event logs:

  • Self-managed offline event log: If you already maintain an offline log of historical events, you can simply point Tecton at that data source.
  • Tecton-managed offline event log: Alternatively, you can instruct the StreamSource to log all ingested events to the offline store.

You can set the following fields on a StreamSource:

  • stream_config: This configures your StreamSource's PushConfig and allows you to specify if you want Tecton to manage your offline event log.
  • batch_config: If you maintain an external offline event log, you use this parameter to point Tecton at it.
  • schema: This is the schema of the stream source, i.e. the schema of the events that will be sent to the Stream Ingest API. Tecton will use this schema to validate all ingested JSON records. Data in an existing offline event log must match the schema.

Example of a Stream Source with a self-managed offline event log

The following example declares a StreamSource that can be used to ingest transaction events. As you can see, the historical event data can be found in a parquet file on S3.

from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64

transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=False), # You don't want a Tecton-managed offline event log
batch_config=FileConfig( # This is the location of your self-managed offline event log
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)

Please see Tecton's Building Streaming Features tutorial for an end to end example.

Example of a Stream Source with a Tecton-managed offline event log

The following example declares a StreamSource that can be used to ingest transaction events. As you can see, no batch_config is specified. Tecton manages the offline event log for you.

from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64

transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True), # You do want a Tecton-managed offline event log
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon