Publish Features
This feature is currently in Public Preview.
Background​
Historical feature data can be valuable for feature selection, model tuning, and experiment analysis. It enables feature time travel, the process of reconstructing all features of a record as it existed at specific points in time. It also helps identify feature and model drift, which can be important in dynamic systems where feature relevance evolves over time.
Tecton's SDK methods can
retrieve historical features
for feature development and training data generation. These methods utilize the
SDK and require an input events dataframe or a set of time ranges.
Alternatively, Tecton provides methods to publish feature data for Batch and Stream Feature Views to a data warehouse or to a custom sink.
When publishing to a data warehouse, Tecton publishes to S3. From there users can configure external tables in data warehouses, or use any other query engine that can process data on S3, to access the feature data.
When publishing to a custom sink, users can also write features to a custom storage via a user-defined function that receives feature data as input.
Publishing full features provides more flexibility to explore feature data from any environment. In addition, when the logic or underlying source data of a Feature View changes, its feature data is also automatically republished.
Enable Publishing Features​
To publish full features to S3:
- Ensure offline materialization is enabled (
offline=True) - In the Feature View's
PublishFeaturesConfig
configuration, set
publish_offline=Trueto publish full features to Tecton's offline store. To write to your own custom store, define a sink and attach it via thepublish_sinkparameter. See examples below.
Please note that this will spin up new
Feature Publish jobs to pre-compute full features for
the full history of the Feature View. Optionally, specify a publish_start_time
in the Feature View's
PublishFeaturesConfig
to limit the range of historical feature data to publish.
- Spark
- Rift
@sink_config(mode="pyspark", secrets={"api_key": Secret(scope="credentials", key="API_KEY")})
def sink_output(df, context):
...
Write to custom store. Can use credentials in `context.secrets["api_key"]`
...
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(function="mean", input_column=Field("amt", Int32), time_window=timedelta(days=1)),
Aggregate(function="mean", input_column=Field("amt", Int32), time_window=timedelta(days=3)),
Aggregate(function="mean", input_column=Field("amt", Int32), time_window=timedelta(days=7)),
Aggregate(function="count", input_column=Field("transaction", Int32), time_window=timedelta(days=1)),
Aggregate(function="count", input_column=Field("transaction", Int32), time_window=timedelta(days=3)),
Aggregate(function="count", input_column=Field("transaction", Int32), time_window=timedelta(days=7)),
],
feature_start_time=datetime(2022, 1, 1),
offline=True,
publish_features_config=PublishFeaturesConfig(
publish_start_time=datetime(2023, 1, 1),
publish_sink=sink_output,
publish_offline=True
),
)
def user_transaction_metrics(transactions):
return f"""
SELECT user_id, timestamp, amt, 1 as transaction
FROM {transactions}
"""
@sink_config(mode="pandas")
def sink_output(df, context):
...
Write to custom store. Can use credentials in `context.secrets["api_key"]
...
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(function="mean", input_column=Field("amt", Int32), time_window=timedelta(days=1)),
Aggregate(function="mean", input_column=Field("amt", Int32), time_window=timedelta(days=3)),
Aggregate(function="mean", input_column=Field("amt", Int32), time_window=timedelta(days=7)),
Aggregate(function="count", input_column=Field("transaction", Int32), time_window=timedelta(days=1)),
Aggregate(function="count", input_column=Field("transaction", Int32), time_window=timedelta(days=3)),
Aggregate(function="count", input_column=Field("transaction", Int32), time_window=timedelta(days=7)),
],
feature_start_time=datetime(2022, 1, 1),
offline=True,
publish_features_config=PublishFeaturesConfig(
publish_start_time=datetime(2023, 1, 1),
publish_sink=sink_output,
publish_offline=True
),
)
def user_transaction_metrics(transactions):
return transactions[["user_id", "timestamp", "amt"]]
Feature Publish Jobs​
Tecton schedules Feature Publish jobs for a time interval once the data in that interval has been successfully materialized to the offline store.
For example, if batch_schedule is set to 1d, Tecton will schedule a Feature Publish job once the daily materialization job successfully completed. If publish is enabled for sink or to S3, then Tecton will automatically schedule a Feature Publish job after the Feature View has successfully materialized each day. Tecton tries to publish the feature data in blocks that match the task windows.
Tecton also intelligently republishes feature data when necessary. For example, if a user manually triggers rematerialization of a specific time interval months ago, Tecton will schedule jobs to republish all features that depend on that time interval's data once it has successfully rematerialized.
