tecton.StreamDataSource

class tecton.StreamDataSource(*, name, description='', family='', tags=None, owner='', batch_ds_config, stream_ds_config)

Declare a StreamDataSource, used to read streaming data into Tecton.

StreamFeatureViews and StreamWindowAggregateFeatureViews ingest data from StreamDataSources. A StreamDataSource contains both a batch and a stream data source configs.

Methods

__init__

Creates a new StreamDataSource.

__init__(*, name, description='', family='', tags=None, owner='', batch_ds_config, stream_ds_config)

Creates a new StreamDataSource.

Parameters
  • name (str) – An unique name of the DataSource.

  • description (str) – (Optional) Description.

  • family (str) – (Optional) Family of this DataSource, used to group Tecton Objects.

  • tags (Optional[Dict[str, str]]) – (Optional) Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).

  • owner (str) – Owner name (typically the email of the primary maintainer).

  • batch_ds_config (Union[FileDSConfig, HiveDSConfig, RedshiftDSConfig, SnowflakeDSConfig]) – BatchDSConfig object containing the configuration of the batch data source that is to be included in this DataSource.

  • stream_ds_config (Union[KinesisDSConfig, KafkaDSConfig]) – StreamDSConfig object containing the configuration of the stream data source that is to be included in this DataSource.

Returns

A StreamDataSource class instance.

Example of a StreamDataSource declaration:

import pyspark
   from tecton import KinesisDSConfig, HiveDSConfig


   # Define our deserialization raw stream translator
   def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
       from pyspark.sql.functions import col, from_json, from_utc_timestamp
       from pyspark.sql.types import StructType, StringType

       payload_schema = (
         StructType()
               .add('amount', StringType(), False)
               .add('isFraud', StringType(), False)
               .add('timestamp', StringType(), False)
       )
       return (
           df.selectExpr('cast (data as STRING) jsonData')
           .select(from_json('jsonData', payload_schema).alias('payload'))
           .select(
               col('payload.amount').cast('long').alias('amount'),
               col('payload.isFraud').cast('long').alias('isFraud'),
               from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
           )
       )

   # Declare a StreamDataSource with both a batch_ds_config and a stream_ds_config as parameters
   # See the API documentation for both BatchDSConfig and StreamDSConfig
   transactions_stream = StreamDataSource(
                           name='transactions_stream',
                           stream_ds_config=KinesisDSConfig(
                               stream_name='transaction_events',
                               region='us-west-2',
                               default_initial_stream_position='latest',
                               default_watermark_delay_threshold='30 minutes',
                               timestamp_key='timestamp',
                               raw_stream_translator=raw_data_deserialization,
                               options={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'}
                           ),
                           batch_ds_config=HiveDSConfig(
                               database='demo_fraud',
                               table='transactions',
                               timestamp_column_name='timestamp',
                           ),
                           family='fraud',
                           owner='jules@tecton.ai',
                           tags={'release': 'staging',
                                 'source: 'mobile'})

Attributes

name

The name of this DataSource.

timestamp_key

The name of the timestamp column or key of this DataSource.