tecton.stream_window_aggregate_feature_view

tecton.stream_window_aggregate_feature_view(mode, inputs, entities, aggregation_slide_period, aggregations, online=False, offline=False, feature_start_time=None, batch_schedule=None, max_batch_aggregation_interval=None, online_serving_index=None, batch_cluster_config=None, stream_cluster_config=None, offline_config=ParquetConfig(), monitoring=None, description='', owner='', family='', tags=None, timestamp_key=None, name_override=None)

Declare a stream window aggregate feature view

Parameters
  • mode (str) – Whether the annotated function is a pipeline function (“pipeline”) or a transformation function (“spark_sql”, “pyspark” or “pandas”). If it’s a transformation mode, we infer the pipeline function.

  • inputs (Dict[str, Input]) – The inputs passed into the pipeline.

  • entities (List[Union[Entity, OverriddenEntity]]) – The entities this feature view is associated with.

  • aggregation_slide_period (str) – how often the feature values will be updated. When set to “continuous”, events will be processed as they arrive, making your features as up to date as possible. Otherwise, you can set the slide period to a time interval, such as ‘1m’ or ‘1h’.

  • aggregations (List[FeatureAggregation]) – A list of FeatureAggregation structs.

  • online (Optional[bool]) – Whether the feature view should be materialized to the online feature store.

  • offline (Optional[bool]) – Whether the feature view should be materialized to the offline feature store.

  • feature_start_time (Union[DateTime, datetime, None]) – When materialization for this feature view should start from.

  • batch_schedule (Optional[str]) – The interval at which batch materialization should be scheduled.

  • max_batch_aggregation_interval (Optional[str]) – (Advanced) makes batch job scheduler group jobs together for efficiency.

  • online_serving_index (Optional[List[str]]) – (Optional, advanced) Defines the set of join keys that will be indexed and queryable during online serving.

  • batch_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Batch materialization cluster configuration. Should be one of: [EMRClusterConfig, DatabricksClusterConfig, ExistingClusterConfig]

  • stream_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Streaming materialization cluster configuration. Should be one of: [EMRClusterConfig, DatabricksClusterConfig, ExistingClusterConfig]

  • offline_config (Union[ParquetConfig, DeltaConfig, None]) – Configuration for how data is written to the offline feature store.

  • monitoring (Optional[MonitoringConfig]) – Monitoring configuration for the feature view.

  • description (str) – (Optional) description.

  • owner (str) – Owner name (typically the email of the primary maintainer).

  • family (str) – (Optional) Family of this Feature View, used to group Tecton Objects.

  • tags (Optional[Dict[str, str]]) – (Optional) Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

  • timestamp_key (Optional[str]) – The column name that refers to the the timestamp for records that are produced by the feature view.

  • name_override (Optional[str]) – Unique, human friendly name override that identifies the FeatureView.

Returns

A Stream Window Aggregate Feature View

An example declaration of stream window aggregate feature view

from tecton import stream_window_aggregate_feature_view, Input, FeatureAggregation
from datetime import datetime

# Declare your Entity and StreamDataSource instances here or import them if defined elsewhere in
# your Tecton repo. Check the API reference documentation on how to declare Entity and StreamDataSource
# instances

transactions_stream = ...
user = ...

# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_window_aggregate_feature_view(
    inputs={'transactions': Input(transactions_stream)},
    entities=[user],
    mode='spark_sql',
    aggregation_slide_period='10m',  # Defines how frequently feature values get updated in the online store
    batch_schedule='1d', # Defines how frequently batch jobs are scheduled to ingest into the offline store
    aggregations=[
        FeatureAggregation(column='amount', function='mean', time_windows=['1h', '12h', '24h','72h']),
        FeatureAggregation(column='amount', function='sum', time_windows=['1h', '12h', '24h','72h'])
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 10, 10),
    family='fraud',
    tags={'release': 'production'},
    owner='kevin@tecton.ai',
    description='Transaction amount statistics and total over a series of time windows, updated every 10 minutes.'
)
def user_transaction_amount_metrics(transactions):
    return f'''
        SELECT
            nameorig as user_id,
            amount,
            timestamp
        FROM
            {transactions}
        '''