Skip to content

Data Sources

A Tecton Data Source defines the connection between Tecton and where you store your data. Once you've registered a Data Source in Tecton, you can access that data to create features with a Feature View.

The Data Source abstraction simplifies your feature definitions by:

  • Providing a consistent data source interface, so modelling logic is isolated from the implementation details of any specific data source.
  • Merge stream and batch data sources to supply data to Feature Transformations in a consistent way.

There are two types of data sources:

  1. BatchDataSource: Configures the connection to exactly one batch data source, such as a Hive table, data warehouse table, or a file.
  2. StreamDataSource: Configures the connection to one stream data source (such as a Kafka Topic or a Kinesis Stream). A stream source must be "backed" by a historical event log in a batch source.

Batch Data Sources

Defining a BatchDataSource enables you to create features from your analytical data with a BatchFeatureView or BatchWindowAggregateFeatureView. When calculating feature values, Tecton will simply read from the underlying data source.

Defining a BatchDataSource

First, create a configuration object that connects to a batch data source.

Now simply define the BatchDataSource with:

  • name: A unique identifier of the DS. For example, "click_event_log".
  • batch_ds_config: The batch source of the DS. One of:
    • FileDSConfig: Represents a single-file data source (such as a file on S3)
    • HiveDSConfig: Represents a Hive Table
    • RedshiftDSConfig: A Redshift Table or Query
    • SnowflakeDSConfig: A Snowflake Table or Query

Additionally, a StreamDataSource requires a stream_ds_config. One of: - KafkaDSConfig: A Kafka Topic - KinesisDSConfig: A Kinesis Stream

See the DataSource reference for detailed descriptions of Data Source attributes.

The following sample code defines a Data Source that is backed only by a Snowflake table.

In /feature_store/data_sources.py:

click_stream_snowflake_ds = SnowflakeDSConfig(
  url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
  database="YOUR_DB",
  schema="CLICK_STREAM_SCHEMA",
  warehouse="COMPUTE_WH",
  table="CLICK_STREAM",
)

clickstream_snowflake_ds = BatchDataSource(
    name="click_stream_snowflake_ds",
    batch_ds_config=click_stream_snowflake_ds,
)

Stream Data Sources

Tecton makes it easy to provide up-to-date streaming data for your models. Once you've defined a StreamDataSource, you can create features from a StreamFeatureView or StreamWindowAggregateFeatureView.

How Tecton uses the stream and batch sources in your StreamDataSource

Tecton allows you to seamlessly backfill and process streaming events from a single feature definition. In order to enable backfills, Tecton requires that a StreamDataSource define both the stream input, as well as a log of historical events from a batch source.

Tecton will read from the batch source when materializing to the offline store, or interactively querying from a Notebook. Materialization to the online store will come from the stream source.

Deserializing Data from a Stream for Data Source Configurations

A StreamDataSource needs to know how a data stream is serialized in order to extract the raw data. The data could be, for example, Avro, Json, or Protobuf; the Data Source applies this information in its data deserialization logic.

You provide deserialization logic in a stream data source's raw_stream_translator parameter. The translator is a Python function containing PySpark code that deserializes a PySpark DataFrame. Building on the example above, here's a stream translator that unpacks a payload nested in a raw JSON event envelope:

from tecton_spark.function_serialization import inlined

@inlined
def raw_data_deserialization(df):
    from pyspark.sql.functions import from_json, col

    PAYLOAD_SCHEMA = (
      StructType()
            .add("accountId", StringType(), False)
            .add("description", StringType(), False)
            .add("transaction_id", StringType(), False)
    )

    EVENT_ENVELOPE_SCHEMA = (
      StructType()
            .add("timestamp", TimestampType(), False)
            .add("payload", PAYLOAD_SCHEMA, False)
  )

    value = col("value").cast("string")
    df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))
    df = df.withColumn("accountId", col("event.payload.accountId"))
    df = df.withColumn("description", col("event.payload.description"))
    df = df.withColumn("transaction_id", col("event.payload.transaction_id"))
    df = df.withColumn("timestamp", col("event.timestamp"))

    return df

click_stream_kafka_ds = KafkaDSConfig(
      default_watermark_delay_threshold="7 days",
      kafka_bootstrap_servers="127.0.0.1:12345",
            topics="click-events-json",
        timestamp_key="click_event_timestamp",
            **raw_stream_translator**=raw_data_deserialization
 )

The stream translator enables you to turn a stream's raw data into a schema that matches the batch data source's schema. Tecton validates that the schemas match.

Example: Defining a StreamDataSource

With our stream translator defined above, this sample below shows code that defines a Data Source backed by a Kafka topic and a Snowflake table.

In /feature_store/data_sources.py:

click_stream_snowflake_ds = SnowflakeDSConfig(
  url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
  database="YOUR_DB",
  schema="CLICK_STREAM_SCHEMA",
  warehouse="COMPUTE_WH",
  table="CLICK_STREAM",
)

click_stream_kafka_ds = KafkaDSConfig(
      default_watermark_delay_threshold="7 days",
      kafka_bootstrap_servers="127.0.0.1:12345",
            topics="click-events-json",
        timestamp_key="click_event_timestamp",
      raw_stream_translator=raw_data_deserialization
 )

click_stream_ds = StreamDataSource(
    name="click_stream_ds",
    batch_ds_config=click_stream_snowflake_ds,
        stream_ds_config=click_stream_kafka_ds
)