Stream Data Sources with Stream Ingest API
This guide shows you how to create a StreamSource
that processes records
pushed to Tecton's
Stream Ingest API endpoint.
See Stream Data Sources with Kafka or Kinesis if you prefer Tecton to connect directly to your existing streaming infrastructure and pull records from it.
Defining a Stream Source with a PushConfig​
A StreamSource
is used by a
StreamFeatureView
to compute
feature values from a continuous streaming data source. It supports the near
real-time calculation of features with a freshness of less than 1s.
To enable point-in-time correct training data generation, and backfills of newly
created features using historical event data, a StreamSource
needs to
configure a Self-managed offline event log, which maintains a copy of stream
events.
You can set the following fields on a StreamSource
:
stream_config
: This configures yourStreamSource
'sPushConfig
which let's you specify apost_processor_mode
,input_schema
, and atimestamp_field
.batch_config
: Use this parameter to point Tecton at a self-managed offline event log.schema
: This is the schema of the stream source, i.e. the schema of the events that will be sent to the Stream Ingest API. Tecton will use this schema to validate all ingested JSON records. Data in an existing offline event log must match the schema.
Example of a Stream Source with a self-managed offline event log​
The following example declares a StreamSource
that can be used to ingest
transaction events. As you can see, the historical event data can be found in a
parquet file on S3.
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=False),
batch_config=FileConfig( # This is the location of your self-managed offline event log
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
Please see Tecton's Building Streaming Features tutorial for an end to end example.
Example of a Stream Source with a Tecton-managed offline event log​
Tecton-managed offline event logging in a StreamSource
is deprecated and will
be removed in a future Tecton SDK release. Please specify a batch_config
on a
StreamSource
for training data generation.
The following example declares a StreamSource
that can be used to ingest
transaction events. As you can see, no batch_config
is specified. Tecton
manages the offline event log for you.
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True), # You do want a Tecton-managed offline event log
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
Example of a Stream Source with a Post Processor​
This example shows how to post-process data for a StreamSource using Python
functions. Post-processing allows you to transform or enrich incoming data
before it is stored or used further in your Feature View transformations. The
following fields must be set in the PushConfig
when using a post processor.
input_schema
: The input schema for the post processor, which can be different from the StreamSource'sschema
post_processor
: A user-defined function that processes records coming from the source; the output of thepost_processor
is used in subsequent stages.post_processor_mode
: The post processor execution mode, must be one ofpython
orpandas
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64, Int64
schema = [Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)]
# Input schema for the post processor
input_schema = [Field("user_uuid", Int64), Field("timestamp", Timestamp), Field("amt", Float64)]
def my_post_processor(row):
if row["amt"] is None:
# Add default amt of 0 for users with missing amount
row["amt"] = 0.0
else:
# Cast amt to Float64
row["amt"] = float(row["amt"])
row["user_id"] = str(row["user_uuid"])
return row
stream_config = PushConfig(
post_processor=my_post_processor,
post_processor_mode="python",
input_schema=input_schema,
)
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True),
schema=schema,
)