Streaming Materialization with Spark
Tecton supports stream computing exclusively in Spark compute mode through Spark Structured Streaming.
Changing instance profiles is a breaking change that requires a manual restart. If you change the instance profile and attempt to retry a streaming job, it will fail. To fix this, cancel the current streaming job in the web UI, then click "Retry" to create a new streaming job with the updated instance profile.
The Data Processing Model
Within Tecton, Stream Data Sources (Kafka or Kinesis) are always registered with an associated batch source representing an accurate historical record of the stream.
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisSource(...),
batch_config=HiveSource(...), # historical stream transactions
)
Tecton leverages the stream's batch data source for the following:
- Bootstrapping the online store during a backfill
- Calculating historical features on-demand during experimentation
- Materializing features to the offline store
Tecton uses the stream data source to materialize real-time features to the online store.
Data Processing and Late-Arriving Data
Online Data with Stream Processing
Tecton sets up a Spark Structured Streaming job to process data from the stream source and then write it to the online store. This Structured Streaming job operates in “Update” mode, meaning that the acceptance of late-arriving data is managed by the watermark setting.
The latest observed timestamp in the configured watermark column determines the watermark. The configured watermark delay threshold will dictate which late data gets discarded. Data earlier than the watermark minus the watermark delay threshold may be discarded. However, it's important to note that this guarantee is one-directional. While data after the watermark minus the watermark delay threshold is assured not to be dropped, there is no guarantee that data before this time will be discarded (refer to the Spark documentation on Semantic Guarantees of Aggregation with Watermarking).
For instance, if you set a watermark period of 10 minutes on the column ts
,
and the latest observed event has ts=00:15:00
, any data before or equal to
00:05:00
might be dropped.
If no watermark is set, late-arriving data is always accepted. However, it is
strongly advised (and nearly mandatory) to use a watermark when using the
deduplication_columns
option or time-interval aggregations. Neglecting to do
so will cause the Spark state store to grow excessively large, resulting in
degraded performance and potentially causing your streaming application to
exhaust memory, even after a restart. This is because these options require
Spark to store all previous data within the watermark period in memory to update
aggregates or deduplicate events.
The watermark column must correspond to the timestamp column configured in your Feature View for correct operation. We advise using the event timestamp for this. This should also coincide with the timestamp used in your associated batch data source. Refer to the Spark documentation on Conditions for Watermarking to Clean Aggregation State for further details.
Offline Data with Batch Processing
Tecton materializes offline data through incremental Spark jobs that read from
the stream's historical batch source. The frequency of this operation is
determined by the batch_schedule
configured on a Stream Feature View.
A batch job will only write data for the timestamps included in its
batch_schedule
interval. Data that arrives after the batch job has run will
not be written to the offline store by future jobs. To rectify previous time
intervals, an
overwrite job
must be executed.
Please note that if you have an overwrite job that writes to the online store, timestamp adjustments may result in duplicate data.
Tecton will process all data in the historical batch source at the time of a query (such as a materialization job or an ad-hoc query against source data). This means that if data is discarded by the watermark delay threshold in the stream but written to the historical batch source, it may appear in offline data. Specifically, it will appear in all ad-hoc queries from source data or in the offline store if it was present in the batch source when the job for that batch schedule interval ran. Also, data arriving within the stream watermark threshold but not in time for the batch job will appear in the online store but not the offline store.
Initializing Online Data from the Historical Batch Source
Because the stream retention may be shorter than the amount of data required for your application, Tecton initializes online data for an initial backfill using the stream’s historical batch source.
For instance, if you have a Stream Feature View with a daily batch schedule, and you start your streaming job at 01-02T01:00, Tecton will run batch jobs that write to the online store up until 01-02T00:00. For cold starting, your streaming data source must have data retention greater than or equal to your batch_schedule, or this could result in data loss. If your streaming data source doesn't have this, we recommend using the materialization API to manually trigger a job to write to the online store for the period after the start. In this case, you must trigger an online job from 01-02T00:00 to 01-03T00:00:00.
Note that batch data always takes precedence over streaming data (when the entity and timestamp match), and your batch data is expected to align perfectly with your stream data.
Streaming Transformations and Online Writes
Stream Feature Views execute user-defined transformations and optional aggregations in a Spark Streaming job and write data to the online store. Note that the data written to the online store is not necessarily the final feature value. Tecton calculates some portion of aggregations at request time to compute features (more details on this in a subsequent section).
The data that is transformed and written to the online store, and the frequency of writes, are dependent on two factors from the Stream Feature View:
- Whether aggregations are defined
- The stream processing mode (i.e.,
StreamProcessingMode.TIME_INTERVAL
vs.StreamProcessingMode.CONTINUOUS
)
For more information on the benefits and trade-offs of time-windowed aggregations and stream processing modes, refer to the documentation on Stream Feature View Time-Windowed Aggregations.
No Aggregations
Time-Interval Stream Processing Mode
The Spark Structured Streaming job transforms all events and writes the transformed data to the online store. These transformations and writes are executed as micro-batches on a default trigger interval of 30 seconds. This means that every 30 seconds, the streaming job will transform a batch of raw events and write the results to the online store. As a result, feature freshness will have a lower bound of 30 seconds.
Continuous Stream Processing Mode
In the CONTINUOUS
stream processing mode, the Spark Structured Streaming job
transforms all events and writes the transformed data to the online store, just
like with TIME_INTERVAL
mode without aggregations.
However, in CONTINUOUS
mode Tecton employs a micro-batch time interval of 0
seconds. This means that new data is written to the online store immediately,
leading to fresher feature values.
Be aware that this comes at the cost of higher checkpointing expenses.
Using Aggregations
Time-Interval Stream Processing Mode
Tecton time-interval aggregations represent sliding windows with a slide
interval corresponding to the configured aggregation_interval
on a Stream
Feature View. Tecton computes these features by calculating partial aggregations
for each slide interval and writing those partial aggregates to the online
store. At request time, multiple partial aggregates get “rolled up” (i.e.,
aggregated) to compute a final feature value.
For instance, if you define a sum
feature aggregation with a 30-minute
time_window
and a 1-minute aggregation_interval
, then Tecton will compute
and write 1-minute aggregations to the online store, grouped by the entities in
the Stream Feature View. At request time, Tecton will aggregate up to 30 partial
1-minute aggregations for the requested entities to compute the 30-minute
aggregation.
The partial aggregates are computed and stored in memory as part of the Spark Structured Streaming job. Partial aggregate values are then written to the online store on every micro-batch trigger interval of 30 seconds. This means that Tecton will make one write to the online store per entity per micro-batch per time interval. Because of this, time-interval aggregations can lead to fewer online store writes (as compared to continuous aggregates) for high-volume streams where individual entities may receive many updates in a micro-batch.
If late-arriving data (subject to the watermark delay threshold) comes in for an old partial aggregate (i.e., an earlier slide interval), that partial aggregate will get updated.
Setting a watermark when using time-interval aggregations is highly recommended because Spark will hold partial aggregates for all entities within the watermark period in memory.
Continuous Stream Processing Mode
Continuous stream processing with aggregations allows for fresher aggregation values. In this mode, Tecton transforms and writes all events from the stream to the online store using a micro-batch interval of 0 seconds, meaning that new data is written to the online store immediately. Time-windowed aggregations are then computed entirely at request time for the requested entities.