Skip to content

Virtual Data Sources

A Virtual Data Source (VDS) connects Tecton to a data source and supplies the data to Tecton in a uniform way by serving raw input data to Feature Transformations.

A Virtual Data Source is called "Virtual" because it abstracts the details of connecting to the raw data source and provides a uniform interface to Tecton Feature Transformations. A VDS can point at two different physical data sources:

  • A batch source: Every VDS configures the connection to exactly one batch data source (such as a Hive table, data warehouse table, or a file).
  • A stream source: A VDS optionally also configures the connection to one stream data source (such as a Kafka Topic or a Kinesis Stream). A stream source must be "backed" by a historical event log in a batch source.

Virtual Data Sources provide the following benefits:

  • Virtual Data Sources provide a consistent data source interface, so modelling logic is isolated from the implementation details of any specific data source.
  • Virtual Data Sources merge stream and batch data sources to supply data to Feature Transformations in a consistent way.

The use of a virtual data source is illustrated here:

The Flow of Data from a Virtual Data Source

When Tecton executes a Feature Transformation, it transparently picks the appropriate underlying data source based on the Feature Transformation's requirements.

Defining a Virtual Data Source

To define a VDS, create a configuration object that connects to the raw data source, then include the configuration object in the VirtualDataSource object. Types of configuration objects are listed in "Attributes of a VDS," next.

Attributes of a VDS

The attributes important in defining a VDS are:

  • name: A unique identifier of the VDS. For example, "click_event_log".
  • batch_ds_config: The batch source of the VDS. One of:
    • FileDSConfig: Represents a single-file data source (such as a file on S3)
    • HiveDSConfig: Represents a Hive Table
    • RedshiftDSConfig: A Redshift Table or Query
    • SnowflakeDSConfig: A Snowflake Table or Query
  • stream_ds_config: The optional stream source of the VDS. One of:
    • KafkaDSConfig: A Kafka Topic
    • KinesisDSConfig: A Kinesis Stream

This is a partial list. See the VirtualDataSource reference for detailed descriptions of VDS attributes.

Materializing Batch Virtual Data Sources

If a VDS points only at a batch data source, then regardless of the execution mode, Tecton reads raw data only from the specified batch data source.

Batch Materializations

Batch VDSs supply data to two different types of Materialization:

  • Steady state materialization to the offline store
  • Backfill materialization

Steady State Materialization to the Offline Store

Materialization to Tecton's Offline Feature Store use the VDS's underlying batch source to populate and update the store.

Steady State Materialization to the Offline Store

Backfill Materialization

Backfill materialization writes to both the online store and the offline store from the VDS's underlying batch source, enabling a stream VDS to fulfill requests that include historical data.

Backfill Materialization to the Offline and Online Stores

Example: Defining a Batch-Only Virtual Data Source

The following sample code defines a VDS that is backed only by a Snowflake table.

/feature_store/data_sources.py

click_stream_snowflake_ds = SnowflakeDSConfig(
  url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
  database="YOUR_DB",
  schema="CLICK_STREAM_SCHEMA",
  warehouse="COMPUTE_WH",
  table="CLICK_STREAM",
)

transaction_snowflake_vds = VirtualDataSource(
    name="click_stream_snowflake_vds",
    batch_ds_config=click_stream_snowflake_ds,
)

Materializing Stream Virtual Data Sources

If the VDS points to a stream data source, then the VDS reads data from both its batch data source and the stream source. Stream data sources are backed by batch data sources that they use to process and store the historical feature data. (See Backfill Materialization, above.)

Stream Materializations

Stream VDSs implement steady state materialization to the Online Feature Store.

Materialization to a Tecton Online Feature Store use the VDS's underlying stream source to populate data for use in prediction.

Stream Materialization to the Online Store

As a result, steady state materialization jobs that write to Tecton's online feature store are executed as ongoing stream data processing jobs.

Deserializing Data from a Stream

A VDS needs to know how a data stream is serialized in order to extract the raw data. The data could be, for example, Avro, Json, or Protobuf; the VDS applies this information in its data deserialization logic.

You provide deserialization logic in a stream data source's raw_stream_translator parameter. The translator is a Python function containing PySpark code that deserializes a PySpark DataFrame. Building on the example above, here's a stream translator that unpacks a payload nested in a raw JSON event envelope:

from tecton_spark.function_serialization import inlined

@inlined
def raw_data_deserialization(df):
    from pyspark.sql.functions import from_json, col

    PAYLOAD_SCHEMA = (
      StructType()
            .add("accountId", StringType(), False)
            .add("description", StringType(), False)
            .add("transaction_id", StringType(), False)
    )

    EVENT_ENVELOPE_SCHEMA = (
      StructType()
            .add("timestamp", TimestampType(), False)
            .add("payload", PAYLOAD_SCHEMA, False)
  )

    value = col("value").cast("string")
    df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))
    df = df.withColumn("accountId", col("event.payload.accountId"))
    df = df.withColumn("description", col("event.payload.description"))
    df = df.withColumn("transaction_id", col("event.payload.transaction_id"))
    df = df.withColumn("timestamp", col("event.timestamp"))

    return df

click_stream_kafka_ds = KafkaDSConfig(
      default_watermark_delay_threshold="7 days",
      kafka_bootstrap_servers="127.0.0.1:12345",
            topics="click-events-json",
        timestamp_key="click_event_timestamp",
            **raw_stream_translator**=raw_data_deserialization
 )

The stream translator enables you to turn a stream's raw data into a schema that matches the batch data source's schema. Tecton validates that the schemas match.

Example: Defining a Stream VirtualDataSource

The sample below shows code that defines a VDS backed by a Kafka topic and a Snowflake table:

/feature_store/data_sources.py

click_stream_snowflake_ds = SnowflakeDSConfig(
  url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
  database="YOUR_DB",
  schema="CLICK_STREAM_SCHEMA",
  warehouse="COMPUTE_WH",
  table="CLICK_STREAM",
)

click_stream_kafka_ds = KafkaDSConfig(
      default_watermark_delay_threshold="7 days",
      kafka_bootstrap_servers="127.0.0.1:12345",
            topics="click-events-json",
        timestamp_key="click_event_timestamp",
 )

transaction_snowflake_vds = VirtualDataSource(
    name="click_stream_snowflake_vds",
    batch_ds_config=click_stream_snowflake_ds,
        stream_ds_config=click_stream_kafka_ds
)