Kafka- and Kinesis-based Stream Sources are supported with Tecton on Databricks and EMR. To bring streaming data into Tecton with Rift, use the Stream Ingest API.
Stream Data Sources with Kafka or Kinesis
This guide shows you how to create a Tecton StreamSource that reads data from
Kafka or Kinesis. The StreamSource can later be used by a StreamFeatureView,
which generates feature values from the raw data that is retrieved by the
StreamSource.
Using a notebook, you will build the StreamSource incrementally. When the
StreamSource is complete, you will apply it in a Tecton repo.
Create and set up new notebookโ
On your data platform (Databricks or EMR), create a new notebook.
Databricks
Follow these instructions to connect a notebook to your Databricks cluster.
EMR
Follow these instructions. Note that specific JAR files need to be installed to use a notebook with Kinesis and Kafka.
Import modules needed to run the notebook:
import tecton
import pandas
from datetime import datetime, timedelta
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
LongType,
)
import dateutil.parser
import tempfile
Verify you can read data directly from the stream source and its corresponding batch sourceโ
Read data directly from the batch sourceโ
batch_data_source = "demo_fraud_v2.transactions"
batch_data = spark.sql(f"SELECT * FROM {batch_data_source} LIMIT 10")
batch_data.show()
Read data directly from the stream sourceโ
The following helper functions are used when reading data directly from the stream source:
# writes data to a table as it is read from the stream
def write_streaming_data_to_table(stream, stream_output_table):
with tempfile.TemporaryDirectory() as d:
(
stream.writeStream.format("memory")
.queryName(stream_output_table)
.option("checkpointLocation", d)
.outputMode("append")
.start()
)
# queries the data in the table and displays the output
def query_streaming_table(stream_output_table):
stream_data = spark.sql("SELECT * FROM " + stream_output_table + " LIMIT 10")
stream_data.show()
- Kinesis (Databricks)
- Kinesis (EMR)
- Kafka
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("is_fraud", LongType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_lat", DoubleType(), False),
StructField("merchant_long", DoubleType(), False),
StructField("timestamp", StringType(), False),
]
)
binary_stream = (
spark.readStream.format("kinesis")
.option("streamName", "<stream name>")
.option("region", "<region>")
.option("roleArn", "<role ARN>")
.option("initialPosition", "earliest")
.load()
)
json_stream = (
binary_stream.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("s"))
.select("s.*")
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("is_fraud", LongType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_lat", DoubleType(), False),
StructField("merchant_long", DoubleType(), False),
StructField("timestamp", StringType(), False),
]
)
binary_stream = (
spark.readStream.format("kinesis")
.option("streamName", "<stream name>")
.option("awsSTSRoleARN", "<role ARN>")
.option("awsSTSSessionName", "tecton-materialization")
.option("startingPosition", "earliest")
.option("kinesis.client.describeShardInterval", "30s")
.option("endpointUrl", "https://<region>.amazonaws.com")
.load()
)
json_stream = (
binary_stream.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("s"))
.select("s.*")
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("is_fraud", LongType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_lat", DoubleType(), False),
StructField("merchant_long", DoubleType(), False),
StructField("timestamp", StringType(), False),
]
)
binary_stream = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "<URL(s) of Kafka broker(s)>")
.option("subscribe", "<Kafka topic name>")
.option("startingOffsets", "earliest")
.load()
)
# Additional options needed depending on your Kafka connectivity method,
# such as TLS or SASL. For example, if connecting using TLS, specify
# kafka.security.protocol (with the value "SSL"), kafka.ssl.keystore.password,
# kafka.ssl.truststore.location, and kafka.ssl.keystore.location.
json_stream = (
binary_stream.selectExpr("cast (value as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("s"))
.select("s.*")
)
Write data to a table as it is read from the stream:
The following command should only be run for a short period of time. The command will continuously read data directly from the stream.
write_streaming_data_to_table(json_stream, "stream_output_table_json")
Query the data in the table and display the output:
query_streaming_table("stream_output_table_json")
If no data is returned after running the previous command, run the command again after a short period of time.
Map the fields of the stream source to the batch sourceโ
Now that we have verified connectivity to the data stream, it is time to focus
on translating and mapping the stream fields. The post processor function
referenced in the post_processor parameter of a
KafkaConfig
or
KinesisConfig
object is a function that maps the fields of the stream source to the batch
source. Following is a description of how the function works.
-
The function accepts a Spark
DataFrameas input. The input contains one row, with one column, which is a JSON string.- If Kinesis is being used, the name of the column is
data. - If Kafka is being used, the name of the column is
value.
- If Kinesis is being used, the name of the column is
-
Using
from_json(), the function converts the column containing the JSON input to aDataFrame, which will eventually be output of the function. Each column of thisDataFrameis:- Casted to the data type of the corresponding column in the batch source
- Renamed to match the name of the corresponding column in the batch source
-
The function returns the resulting
DataFramefrom the previous step.
Example mappingโ
The following table shows an example mapping of stream source columns to batch source columns, as well as the data type of each batch source column. Note that some of the stream column and batch column names are different.
| Stream column name | Batch column name | Batch column data type |
|---|---|---|
user_id | user_id | string |
transaction_id | transaction_id | string |
category | category | string |
amount | amt | double |
is_fraud | is_fraud | long |
merchant | merchant | string |
merchant_latitude | merch_lat | double |
merchant_longitude | merch_long | double |
timestamp | timestamp | string |
Write the stream message post processor functionโ
The following function implements the mapping that is shown in the table above.
- Kinesis
- Kafka
def translate_stream(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_latitude", StringType(), False),
StructField("merchant_longitude", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amount").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merchant_latitude").cast("double").alias("merch_lat"),
col("payload.merchant_longitude").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
def translate_stream(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_latitude", StringType(), False),
StructField("merchant_longitude", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (value as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amount").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merchant_latitude").cast("double").alias("merch_lat"),
col("payload.merchant_longitude").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
Verify that the batch source and translated stream source are compatibleโ
The schema of the batch source must be a superset of the schema of the translated stream source. The following function performs this check.
def check_batch_stream_schema_compatibility(batch_data, translated_stream_data):
batch_set = set(batch_data.columns)
stream_set = set(translated_stream_data.columns)
if stream_set.issubset(batch_set):
print("Success! Schemas are aligned.")
print("\nThe following batch source columns are unused in the stream:")
print(batch_set.difference(stream_set))
else:
print("Error: Columns from the translated stream source are not present in the batch source:")
print(stream_set.difference(batch_set))
print("\nStream Source Columns:")
print(translated_stream_data.columns)
print("\nBatch Source Columns:")
print(batch_data.columns)
check_batch_stream_schema_compatibility(batch_data, translate_stream(binary_stream))
Run the post processor functionโ
Run the post processor function to verify that the columns in the stream source are mapped to the batch source, as expected.
translated_stream = translate_stream(binary_stream)
Write data to a table as it is read from the stream:
The following command should only be run for a short period of time. The command will continuously read data directly from the stream.
write_streaming_data_to_table(translated_stream, "translated_stream_output_table")
Query the data in the table and display the output:
query_streaming_table("translated_stream_output_table")
If no data is returned after running the previous command, run the command again after a short period of time.
Create the StreamSourceโ
Using the configurations you used earlier to connect to the batch source and
stream source, as well as the stream post processor function you defined
earlier, define the
StreamSource using the
following steps.
-
Define a
KinesisConfigorKafkaConfigobject for your stream source. As part of the object definition, setpost_processorto the name of the streaming message post processor function that you wrote earlier.Find the complete list of data source configuration classes in API Reference.
-
Define a config object such as
HiveConfigthat is specific to the type of your batch source. -
Define the
StreamSourceobject, where you setbatch_configandstream_configto the names of the config objects you defined in steps 1 and 2.
Write the StreamSource definitionโ
A StreamSource contains these configurations:
stream_config: The configuration for a stream source, which contains parameters for connecting to Kinesis or Kafka.batch_config: The configuration for a batch source that backs the stream source; the batch source contains the stream's historical data.
The value of these configs can be the name of an object (such as HiveConfig
or KafkaConfig) or a
Data Source Function.
A Data Source Function offers more flexibility than an object.
A StreamSource is used by a StreamFeatureView to generate feature values
using data from both the stream and batch sources.
- Kinesis (Databricks)
- Kinesis (EMR)
- Kafka
from tecton import (
HiveConfig,
KinesisConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta
def transactions_stream_translator(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_latitude", StringType(), False),
StructField("merchant_longitude", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amount").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merchant_latitude").cast("double").alias("merch_lat"),
col("payload.merchant_longitude").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]
batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)
stream_config = KinesisConfig(
stream_name="<stream name>",
region="<region>",
initial_stream_position="earliest",
timestamp_field="timestamp",
post_processor=transactions_stream_translator,
options={"roleArn": "<role ARN>"},
)
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=stream_config,
batch_config=batch_config,
)
from tecton import (
HiveConfig,
KinesisConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta
def transactions_stream_translator(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_latitude", StringType(), False),
StructField("merchant_longitude", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amount").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merchant_latitude").cast("double").alias("merch_lat"),
col("payload.merchant_longitude").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]
batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)
stream_config = KinesisConfig(
stream_name="<stream name>",
region="<region name>",
initial_stream_position="earliest",
timestamp_field="timestamp",
post_processor=transactions_stream_translator,
options={"awsSTSRoleARN": "<role ARN>"},
)
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=stream_config,
batch_config=batch_config,
)
Note: In the KafkaConfig definition below, additional
parameters are needed depending on your Kafka connectivity method. See
Connecting
Kafka Streams for more details.
from tecton import (
HiveConfig,
KafkaConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta
import os
def translate_stream(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_latitude", StringType(), False),
StructField("merchant_longitude", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (value as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amount").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merchant_latitude").cast("double").alias("merch_lat"),
col("payload.merchant_longitude").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]
batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)
stream_config = KafkaConfig(
kafka_bootstrap_servers="<URL(s) of Kafka broker(s)>",
topics="<topic name>",
post_processor=translate_stream,
timestamp_field="timestamp",
)
transactions_stream = StreamSource(name="transactions_stream", stream_config=stream_config, batch_config=batch_config)
When using Kafka, be aware that by default Kafka consumes all existing data into
a single micro-batch, which can overwhelm the Spark cluster during backfilling
from a stream for the first time. By default this is set to 100,000. To change
this, set the maxOffsetsPerTrigger property to rate limit Spark structured
streaming when connecting to Kafka. This ensures you don't fetch too much data
from Kafka between each trigger.
For example, add this option to your Kafka Configuration:
stream_config = KafkaConfig(
kafka_bootstrap_servers="<URL(s) of Kafka broker(s)>",
kafka_topics=["<Kafka topic name>"],
options={
"maxOffsetsPerTrigger": "200000",
},
)
Test the StreamSourceโ
You can verify that your StreamSource is connected properly by following the
Testing Data Sources guide.
Create a Stream Feature View that uses the Stream Sourceโ
from datetime import datetime
from datetime import timedelta
from entities import user
from transactions import transactions_stream
from tecton import stream_feature_view, Attribute
from tecton.types import Field, Int64, String, Bool
@stream_feature_view(
source=transactions_stream,
entities=[user],
timestamp_field="timestamp",
features=[
Attribute("transaction_id", Int64),
Attribute("category", String),
Attribute("amt", Int64),
Attribute("is_fraud", Bool),
Attribute("merchant", String),
Attribute("merch_lat", Int64),
Attribute("merch_long", Int64),
],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 20),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
)
def stream_features(transactions_stream):
return f"""
SELECT
user_id,
transaction_id,
category,
amt,
is_fraud,
merchant,
merch_lat,
merch_long,
timestamp
FROM
{transactions_stream}
"""
A StreamFeatureView applies the same transformation to both the stream and the
batch sources. If the data from the stream and batch sources do not have the
same schema, then define a post_processor function for at least one data
source to output the expected schema for the transformation.