tecton.stream_feature_view

tecton.stream_feature_view(mode, inputs, entities, online=False, offline=False, ttl=None, feature_start_time=None, batch_schedule=None, max_batch_aggregation_interval=None, online_serving_index=None, batch_cluster_config=None, stream_cluster_config=None, offline_config=ParquetConfig(), online_config=None, monitoring=None, description='', owner='', family='', tags=None, timestamp_key=None, name_override=None)

Declare a stream feature view

Parameters
  • mode (str) – Whether the annotated function is a pipeline function (“pipeline”) or a transformation function (“spark_sql”, “pyspark” or “pandas”). If it’s a transformation mode, we infer the pipeline function.

  • inputs (Dict[str, Input]) – The inputs passed into the pipeline.

  • entities (List[Union[Entity, OverriddenEntity]]) – The entities this feature view is associated with.

  • online (Optional[bool]) – Whether the feature view should be materialized to the online feature store.

  • offline (Optional[bool]) – Whether the feature view should be materialized to the offline feature store.

  • ttl (Optional[str]) – The TTL for features defined by this feature view.

  • feature_start_time (Union[DateTime, datetime, None]) – When materialization for this feature view should start from.

  • batch_schedule (Optional[str]) – The interval at which batch materialization should be scheduled.

  • max_batch_aggregation_interval (Optional[str]) – (Advanced) makes batch job scheduler group jobs together for efficiency.

  • online_serving_index (Optional[List[str]]) – (Optional, advanced) Defines the set of join keys that will be indexed and queryable during online serving.

  • batch_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Batch materialization cluster configuration. Should be one of: [EMRClusterConfig, DatabricksClusterConfig, ExistingClusterConfig]

  • stream_cluster_config (Union[ExistingClusterConfig, DatabricksClusterConfig, EMRClusterConfig, None]) – Streaming materialization cluster configuration. Should be one of: [EMRClusterConfig, DatabricksClusterConfig, ExistingClusterConfig]

  • offline_config (Union[ParquetConfig, DeltaConfig, None]) – Configuration for how data is written to the offline feature store.

  • online_config (Union[DynamoConfig, RedisConfig, None]) – Configuration for how data is written to the online feature store.

  • monitoring (Optional[MonitoringConfig]) – Monitoring configuration for the feature view.

  • description (str) – (Optional) description.

  • owner (str) – Owner name (typically the email of the primary maintainer).

  • family (str) – (Optional) Family of this Feature View, used to group Tecton Objects.

  • tags (Optional[Dict[str, str]]) – (Optional) Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

  • timestamp_key (Optional[str]) – The column name that refers to the the timestamp for records that are produced by the feature view.

  • name_override (Optional[str]) – Unique, human friendly name override that identifies the FeatureView.

Returns

A Stream Feature View

An example declaration of StreamFeatureView

from tecton import stream_feature_view, Input
from datetime import datetime

# Declare your Entity and StreamDataSource instances here or import them if defined elsewhere in
# your Tecton repo. Check the API reference documentation on how to declare Entity and StreamDataSource
# instances

transactions_stream = ...
user = ...
@stream_feature_view(
    inputs={'transactions': Input(transactions_stream)},
    entities=[user],
    mode='spark_sql',
    online=True,
    offline=True,
    feature_start_time=datetime(2021, 5, 20),
    batch_schedule='1d',
    ttl='30days',
    family='fraud',
    description='Last user transaction amount (stream calculated)'
)
def last_transaction_amount_sql(transactions):
    return f'''
        SELECT
            timestamp,
            nameorig as user_id,
            amount
        FROM
            {transactions}
        '''