Skip to content

Data Sources

Tecton Data Sources are used to declare Tecton's connection to your data. Data Sources can be read to create features with a Feature View.

The Data Source abstraction simplifies your feature definitions by:

  • Providing a consistent data source interface that isolates modeling logic from the implementation details of any specific data source.
  • Merge stream and batch data sources to consistently supply online and historical data to Streaming Feature Views.

There are two types of Data Sources:

  1. BatchSource: Configures the connection to exactly one batch data source, such as a Hive table, data warehouse table, or a file.
  2. StreamSource: Configures the connection to one stream data source (such as a Kafka Topic or a Kinesis Stream). A stream source must be "backed" by a historical event log in a batch source.

Batch Sources

A BatchSource is required to create features from your data with a BatchFeatureView. When calculating feature values, Tecton will read data from the data source defined by BatchSource.

BatchSource may optionally contain a timestamp column representing the time of each record. Values in the timestamp column must be one of the following formats:

  • A native TimestampType object.
  • A string representing a timestamp that can be parsed by default Spark SQL yyyy-MM-dd'T'hh:mm:ss.SSS'Z'
  • A customized string representing a timestamp, for which you can provide a custom timestamp_format to parse the string. The format has to follow this guideline

Defining a BatchSource

  1. Create a configuration that is specific to your data source. Tecton currently supports:

    • FileConfig: Single-file data source (such as a file on S3)
    • HiveConfig: Hive (or Glue) Table
    • RedshiftConfig: Redshift Table or Query
    • SnowflakeConfig: Snowflake Table or Query

    Note

    Tecton on Snowflake only supports SnowflakeConfig.

    The complete list of configurations can be found in API Reference.

  2. Define a BatchSource that references the configuration defined in the previous step:

    • name: A unique identifier for the batch source. For example, "click_event_log".
    • batch_config: The configuration created in the step above.

See the Data Source API reference for detailed descriptions of Data Source attributes.

Example

The following example defines a batch source based on a Snowflake table.

click_stream_snowflake_ds = SnowflakeConfig(
  url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
  database="YOUR_DB",
  schema="CLICK_STREAM_SCHEMA",
  warehouse="COMPUTE_WH",
  table="CLICK_STREAM",
)

clickstream_snowflake_ds = BatchSource(
    name="click_stream_snowflake_ds",
    batch_config=click_stream_snowflake_ds,
)

Stream Sources

Tecton makes it easy to provide up-to-date streaming data for your models. Once you've defined a StreamSource, you can create features from a StreamFeatureView.

Stream + Batch Requirement

Tecton backfills and process streaming events from a single feature pipeline definition. In order to enable backfills, Tecton requires StreamSource to define both a streaming input, as well as a log of historical events from a batch source.

Tecton will read from the batch source for backfills or when materializing to the offline store, or interactively historical data using the SDK. Materialization to the online store in steady-state is based on the stream source.

Defining a StreamSource

  1. Create a configuration that is specific to your streaming data source. Tecton currently supports:

    • KinesisConfig: an AWS Kinesis Stream
    • KafkaConfig: a Kafka topic

    The complete list of configurations can be found in API Reference.

  2. Create a configuration that for your batch data source. E.g. HiveConfig or SnowflakeConfig.

  3. Define a StreamSource that references the configuration defined in the previous step:

    • name: A unique identifier of the stream source. For example, "click_event_log".
    • batch_config: The configuration created in the step above.
    • stream_config: The configuration created in the step above.

Streaming Message Deserialization

A StreamSource requires instructions for deserializing each message. This is provided using the post_processor parameter, whose value is a function containing PySpark code that deserializes a PySpark DataFrame. The function maps the schema used by the stream's raw data to the batch data source's schema. When Tecton runs the function, it validates that the schemas match.

Building on the example above, here's a deserialization function that unpacks a payload nested in a raw JSON event envelope:

def raw_data_deserialization(df):
    from pyspark.sql.functions import from_json, col

    PAYLOAD_SCHEMA = (
      StructType()
            .add("accountId", StringType(), False)
            .add("description", StringType(), False)
            .add("transaction_id", StringType(), False)
    )

    EVENT_ENVELOPE_SCHEMA = (
      StructType()
            .add("timestamp", TimestampType(), False)
            .add("payload", PAYLOAD_SCHEMA, False)
    )

    value = col("value").cast("string")
    df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))
    df = df.withColumn("accountId", col("event.payload.accountId"))
    df = df.withColumn("description", col("event.payload.description"))
    df = df.withColumn("transaction_id", col("event.payload.transaction_id"))
    df = df.withColumn("timestamp", col("event.timestamp"))

    return df

click_stream_kafka_ds = KafkaConfig(
    watermark_delay_threshold=timedelta(days=7),
    kafka_bootstrap_servers="127.0.0.1:12345",
    topics="click-events-json",
    timestamp_field="click_event_timestamp",
    post_processor=raw_data_deserialization
 )

Example

With our stream translator defined above, this sample below shows code that defines a data source backed by a Kafka topic and a Snowflake table.

click_stream_snowflake_ds = SnowflakeConfig(
    url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
    database="YOUR_DB",
    schema="CLICK_STREAM_SCHEMA",
    warehouse="COMPUTE_WH",
    table="CLICK_STREAM",
)

click_stream_kafka_ds = KafkaConfig(
    watermark_delay_threshold=timedelta(days=7),
    kafka_bootstrap_servers="127.0.0.1:12345",
    topics="click-events-json",
    timestamp_field="click_event_timestamp",
    post_processor=raw_data_deserialization
 )

click_stream_ds = StreamSource(
    name="click_stream_ds",
    batch_config=click_stream_snowflake_ds,
    stream_config=click_stream_kafka_ds
)