tecton.stream_feature_view

tecton.stream_feature_view(*, mode, inputs, entities, ttl, online=False, offline=False, feature_start_time=None, batch_schedule=None, description=None, owner=None, family=None, tags=None, batch_cluster_config=None, stream_cluster_config=None, offline_config=ParquetConfig(), online_config=None, monitoring=None, timestamp_key=None, name_override=None, max_batch_aggregation_interval=None, online_serving_index=None, output_stream=None)

Declare a stream feature view

Parameters
  • mode (str) – Whether the annotated function is a pipeline function (“pipeline” mode) or a transformation function (“spark_sql” or “pyspark” mode). For the non-pipeline mode, an inferred transformation will also be registered.

  • inputs (Dict[str, Input]) – The inputs passed into the pipeline.

  • entities (List[Union[Entity, Entity, OverriddenEntity]]) – The entities this feature view is associated with.

  • ttl (str) – The TTL (or “look back window”) for features defined by this feature view. This parameter determines how long features will live in the online store and how far to “look back” relative to a training example’s timestamp when generating offline training sets. Shorter TTLs improve performance and reduce costs.

  • online (Optional[bool]) – Whether the feature view should be materialized to the online feature store. (Default: False)

  • offline (Optional[bool]) – Whether the feature view should be materialized to the offline feature store. (Default: False)

  • feature_start_time (Union[DateTime, datetime, None]) – When materialization for this feature view should start from. (Required if offline=true)

  • batch_schedule (Optional[str]) – The interval at which batch materialization should be scheduled.

  • description (Optional[str]) – Human readable description.

  • owner (Optional[str]) – Owner name (typically the email of the primary maintainer).

  • family (Optional[str]) – Family of this Feature View, used to group Tecton Objects.

  • tags (Optional[Dict[str, str]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

  • batch_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Batch materialization cluster configuration.

  • stream_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Streaming materialization cluster configuration.

  • offline_config (Union[ParquetConfig, DeltaConfig, None]) – Configuration for how data is written to the offline feature store.

  • online_config (Union[DynamoConfig, RedisConfig, None]) – Configuration for how data is written to the online feature store.

  • monitoring (Optional[MonitoringConfig]) – Monitoring configuration for the feature view.

  • timestamp_key (Optional[str]) – The column name that refers to the timestamp for records that are produced by the feature view. (Default: will infer if one column is a Timestamp type.)

  • name_override (Optional[str]) – Unique, human friendly name override that identifies the FeatureView.

  • max_batch_aggregation_interval (Optional[str]) – (Advanced) makes batch job scheduler group jobs together for efficiency.

  • online_serving_index (Optional[List[str]]) – (Advanced) Defines the set of join keys that will be indexed and queryable during online serving.

Returns

An object of type tecton.feature_views.MaterializedFeatureView.

An example declaration of StreamFeatureView

from tecton import stream_feature_view, Input
from datetime import datetime

# Declare your Entity and StreamDataSource instances here or import them if defined elsewhere in
# your Tecton repo. Check the API reference documentation on how to declare Entity and StreamDataSource
# instances

transactions_stream = ...
user = ...
@stream_feature_view(
    inputs={'transactions': Input(transactions_stream)},
    entities=[user],
    mode='spark_sql',
    online=True,
    offline=True,
    feature_start_time=datetime(2021, 5, 20),
    batch_schedule='1d',
    ttl='30days',
    family='fraud',
    description='Last user transaction amount (stream calculated)'
)
def last_transaction_amount_sql(transactions):
    return f'''
        SELECT
            timestamp,
            nameorig as user_id,
            amount
        FROM
            {transactions}
        '''