Stream Data Sources with Stream Ingest API
This guide shows you how to create a StreamSource
that processes records
pushed to Tecton's Stream Ingest API endpoint.
See Stream Data Sources with Kafka or Kinesis if you prefer Tecton to connect directly to your existing streaming infrastructure and pull records from it.
Defining a Stream Source with a PushConfig​
A StreamSource
is used by a
StreamFeatureView
to compute
feature values from a continuous streaming data source. It supports the near
real-time calculation of features with a freshness of less than 1s.
To enable point-in-time correct training data generation, and backfills of newly
created features using historical event data, a StreamSource
needs to
configure one of two possible offline event logs:
- Self-managed offline event log: If you already maintain an offline log of historical events, you can simply point Tecton at that data source.
- Tecton-managed offline event log: Alternatively, you can instruct the
StreamSource
to log all ingested events to the offline store.
You can set the following fields on a StreamSource
:
stream_config
: This configures yourStreamSource
'sPushConfig
and allows you to specify if you want Tecton to manage your offline event log.batch_config
: If you maintain an external offline event log, you use this parameter to point Tecton at it.schema
: This is the schema of the stream source, i.e. the schema of the events that will be sent to the Stream Ingest API. Tecton will use this schema to validate all ingested JSON records. Data in an existing offline event log must match the schema.
Example of a Stream Source with a self-managed offline event log​
The following example declares a StreamSource
that can be used to ingest
transaction events. As you can see, the historical event data can be found in a
parquet file on S3.
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=False), # You don't want a Tecton-managed offline event log
batch_config=FileConfig( # This is the location of your self-managed offline event log
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
Please see Tecton's Building Streaming Features tutorial for an end to end example.
Example of a Stream Source with a Tecton-managed offline event log​
The following example declares a StreamSource
that can be used to ingest
transaction events. As you can see, no batch_config
is specified. Tecton
manages the offline event log for you.
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True), # You do want a Tecton-managed offline event log
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
Example of a Stream Source with a Post Processor​
This example shows how to post-process data for a StreamSource using Python
functions. Post-processing allows you to transform or enrich incoming data
before it is stored or used further in your Feature View transformations. The
following fields must be set in the PushConfig
when using a post processor.
input_schema
: The input schema for the post processor, which can be different from the StreamSource'sschema
post_processor
: A user-defined function that pre-processes records in the sourcepost_processor_mode
: The post processor execution mode, must be one ofpython
orpandas
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64, Int64
schema = [Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)]
# Input schema for the post processor
input_schema = [Field("user_uuid", Int64), Field("timestamp", Timestamp), Field("amt", Float64)]
def my_post_processor(row):
if row["amt"] is None:
# Add default amt of 0 for users with missing amount
row["amt"] = 0.0
else:
# Cast amt to Float64
row["amt"] = float(row["amt"])
row["user_id"] = str(row["user_uuid"])
return row
stream_config = PushConfig(
post_processor=my_post_processor,
post_processor_mode="python",
input_schema=input_schema,
)
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True),
schema=schema,
)