tecton.batch_window_aggregate_feature_view

tecton.batch_window_aggregate_feature_view(*, mode, inputs, entities, aggregation_slide_period, aggregations, online=False, offline=False, feature_start_time=None, batch_schedule=None, description=None, owner=None, family=None, tags=None, batch_cluster_config=None, offline_config=ParquetConfig(), online_config=None, monitoring=None, timestamp_key=None, name_override=None, max_batch_aggregation_interval=None, online_serving_index=None)

Declare a batch window aggregate feature view

Parameters
  • mode (str) – Whether the annotated function is a pipeline function (“pipeline” mode) or a transformation function (“spark_sql” or “pyspark” mode). For the non-pipeline mode, an inferred transformation will also be registered.

  • inputs (Dict[str, Input]) – The inputs passed into the pipeline.

  • entities (List[Union[Entity, Entity, OverriddenEntity]]) – The entities this feature view is associated with.

  • aggregation_slide_period (str) – How frequently the feature value is updated (for example, “1h” or “6h”)

  • aggregations (List[FeatureAggregation]) – A list of FeatureAggregation structs.

  • online (Optional[bool]) – Whether the feature view should be materialized to the online feature store. (Default: False)

  • offline (Optional[bool]) – Whether the feature view should be materialized to the offline feature store. (Default: False)

  • feature_start_time (Union[DateTime, datetime, None]) – When materialization for this feature view should start from. (Required if offline=true)

  • batch_schedule (Optional[str]) – The interval at which batch materialization should be scheduled.

  • description (Optional[str]) – Human readable description.

  • owner (Optional[str]) – Owner name (typically the email of the primary maintainer).

  • family (Optional[str]) – Family of this Feature View, used to group Tecton Objects.

  • tags (Optional[Dict[str, str]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

  • batch_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Batch materialization cluster configuration.

  • offline_config (Union[ParquetConfig, DeltaConfig, None]) – Configuration for how data is written to the offline feature store.

  • online_config (Union[DynamoConfig, RedisConfig, None]) – Configuration for how data is written to the online feature store.

  • monitoring (Optional[MonitoringConfig]) – Monitoring configuration for the feature view.

  • timestamp_key (Optional[str]) – The column name that refers to the timestamp for records that are produced by the feature view. (Default: will infer if one column is a Timestamp type.)

  • name_override (Optional[str]) – Unique, human friendly name override that identifies the FeatureView.

  • max_batch_aggregation_interval (Optional[str]) – (Advanced) makes batch job scheduler group jobs together for efficiency.

  • online_serving_index (Optional[List[str]]) – (Advanced) Defines the set of join keys that will be indexed and queryable during online serving.

Returns

An object of type tecton.feature_views.MaterializedFeatureView.

An example declaration of batch window aggregate feature view

from tecton.feature_views import batch_window_aggregate_feature_view
from tecton.feature_views.feature_view import Input
from tecton import FeatureAggregation
from datetime import datetime

# Declare your Entity and BatchDataSource instances here or import them if defined elsewhere in
# your Tecton repo. Check the API reference documentation on how to declare Entity and BatchDataSource
# instances

transactions_batch = ...
user = ...
@batch_window_aggregate_feature_view(
    inputs={'transactions': Input(transactions_batch)},
    entities=[user],
    mode='spark_sql',
    aggregation_slide_period='1d',
    aggregations=[FeatureAggregation(column='transaction', function='count',
                                     time_windows=['24h','72h','168h', '960h'])],
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 10, 10),
    family='fraud',
    tags={'release': 'production'},
    owner='matt@tecton.ai',
    description='User transaction totals over a series of time windows, updated daily.'
)
def user_transaction_counts(transactions):
    return f'''
        SELECT
            nameorig as user_id,
            1 as transaction,
            timestamp
        FROM
            {transactions}
        '''