Data Sources
Tecton Data Sources are used to declare Tecton's connection to your data. Data Sources can be read to create features with a Feature View.
The Data Source abstraction simplifies your feature definitions by:
- Providing a consistent data source interface that isolates modeling logic from the implementation details of any specific data source.
- Merge stream and batch data sources to consistently supply online and historical data to Streaming Feature Views.
There are two types of Data Sources:
BatchSource
: Configures the connection to exactly one batch data source, such as a Hive table, data warehouse table, or a file.StreamSource
: Configures the connection to one stream data source (such as a Kafka Topic or a Kinesis Stream). A stream source must be "backed" by a historical event log in a batch source.
Batch Sources
A BatchSource
is required to create features from your data with a BatchFeatureView
. When calculating feature values, Tecton will read data from the data source defined by BatchSource
.
BatchSource
may optionally contain a timestamp column representing the time of each record. Values in the timestamp column must be one of the following formats:
- A native TimestampType object.
- A string representing a timestamp that can be parsed by default Spark SQL
yyyy-MM-dd'T'hh:mm:ss.SSS'Z'
- A customized string representing a timestamp, for which you can provide a custom timestamp_format to parse the string. The format has to follow this guideline
Defining a BatchSource
-
Create a configuration that is specific to your data source. Tecton currently supports:
FileConfig
: Single-file data source (such as a file on S3)HiveConfig
: Hive (or Glue) TableRedshiftConfig
: Redshift Table or QuerySnowflakeConfig
: Snowflake Table or Query
Note
Tecton on Snowflake only supports
SnowflakeConfig
.The complete list of configurations can be found in API Reference.
-
Define a
BatchSource
that references the configuration defined in the previous step:name
: A unique identifier for the batch source. For example,"click_event_log"
.batch_config
: The configuration created in the step above.
See the Data Source API reference for detailed descriptions of Data Source attributes.
Example
The following example defines a batch source based on a Snowflake table.
click_stream_snowflake_ds = SnowflakeConfig(
url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
database="YOUR_DB",
schema="CLICK_STREAM_SCHEMA",
warehouse="COMPUTE_WH",
table="CLICK_STREAM",
)
clickstream_snowflake_ds = BatchSource(
name="click_stream_snowflake_ds",
batch_config=click_stream_snowflake_ds,
)
Stream Sources
Tecton makes it easy to provide up-to-date streaming data for your models. Once you've defined a StreamSource
, you can create features from a StreamFeatureView
.
Stream + Batch Requirement
Tecton backfills and process streaming events from a single feature pipeline definition. In order to enable backfills, Tecton requires StreamSource
to define both a streaming input, as well as a log of historical events from a batch source.
Tecton will read from the batch source for backfills or when materializing to the offline store, or interactively historical data using the SDK. Materialization to the online store in steady-state is based on the stream source.
Defining a StreamSource
-
Create a configuration that is specific to your streaming data source. Tecton currently supports:
KinesisConfig
: an AWS Kinesis StreamKafkaConfig
: a Kafka topic
The complete list of configurations can be found in API Reference.
-
Create a configuration that for your batch data source. E.g.
HiveConfig
orSnowflakeConfig
. -
Define a
StreamSource
that references the configuration defined in the previous step:name
: A unique identifier of the stream source. For example,"click_event_log"
.batch_config
: The configuration created in the step above.stream_config
: The configuration created in the step above.
Streaming Message Deserialization
A StreamSource
requires instructions for deserializing each message. This is provided using the post_processor
parameter, whose value is a function containing PySpark code that deserializes a PySpark DataFrame
. The function maps the schema used by the stream's raw data to the batch data source's schema. When Tecton runs the function, it validates that the schemas match.
Building on the example above, here's a deserialization function that unpacks a payload nested in a raw JSON event envelope:
def raw_data_deserialization(df):
from pyspark.sql.functions import from_json, col
PAYLOAD_SCHEMA = (
StructType()
.add("accountId", StringType(), False)
.add("description", StringType(), False)
.add("transaction_id", StringType(), False)
)
EVENT_ENVELOPE_SCHEMA = (
StructType()
.add("timestamp", TimestampType(), False)
.add("payload", PAYLOAD_SCHEMA, False)
)
value = col("value").cast("string")
df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))
df = df.withColumn("accountId", col("event.payload.accountId"))
df = df.withColumn("description", col("event.payload.description"))
df = df.withColumn("transaction_id", col("event.payload.transaction_id"))
df = df.withColumn("timestamp", col("event.timestamp"))
return df
click_stream_kafka_ds = KafkaConfig(
watermark_delay_threshold=timedelta(days=7),
kafka_bootstrap_servers="127.0.0.1:12345",
topics="click-events-json",
timestamp_field="click_event_timestamp",
post_processor=raw_data_deserialization
)
Example
With our stream translator defined above, this sample below shows code that defines a data source backed by a Kafka topic and a Snowflake table.
click_stream_snowflake_ds = SnowflakeConfig(
url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
database="YOUR_DB",
schema="CLICK_STREAM_SCHEMA",
warehouse="COMPUTE_WH",
table="CLICK_STREAM",
)
click_stream_kafka_ds = KafkaConfig(
watermark_delay_threshold=timedelta(days=7),
kafka_bootstrap_servers="127.0.0.1:12345",
topics="click-events-json",
timestamp_field="click_event_timestamp",
post_processor=raw_data_deserialization
)
click_stream_ds = StreamSource(
name="click_stream_ds",
batch_config=click_stream_snowflake_ds,
stream_config=click_stream_kafka_ds
)