Skip to content

Create a Streaming Data Source

This guide shows you how to create a Tecton StreamSource that reads data from Kafka or Kinesis. The StreamSource can later be used by a StreamFeatureView, which generates feature values from the raw data that is retrieved by the StreamSource.

Using a notebook, you will build the StreamSource incrementally. When the StreamSource is complete, you will apply it in a Tecton repo.

Note

Before proceeding, get a foundational understanding of stream sources and what they contain.

The following applies to this guide:

  • The steps apply to Databricks and EMR, unless noted otherwise.

  • The term "batch source" refers to the batch source which backs a stream source; the batch source contains the stream's historical data.

    If you are using Kinesis as your stream source, you can use Kinesis Data Firehose to send the streaming data to the batch source.

  • The transactions table in the demo_fraud_v2 database is used as an example batch source. transactions_stream is used as an example stream source.

Create and set up new notebook

On your data platform (Databricks or EMR), create a new notebook.

Follow these instructions to connect a notebook to your Databricks cluster.

Follow these instructions. Note that specific JAR files need to be installed to use a notebook with Kinesis and Kafka.

Import modules needed to run the notebook:

import tecton
import pandas
from datetime import datetime, timedelta
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType, LongType
import dateutil.parser
import tempfile

Verify you can read data directly from the stream source and its corresponding batch source

Read data directly from the batch source

batch_data_source = "demo_fraud_v2.transactions"
batch_data = spark.sql(f"SELECT * FROM {batch_data_source} LIMIT 10")
batch_data.show()

Read data directly from the stream source

The following helper functions are used when reading data directly from the stream source:

* `write_streaming_data_to_table` writes data to a table as it is read from the stream
* `query_streaming_table` queries the data in the table and displays the output
def write_streaming_data_to_table(stream, stream_output_table):
    with tempfile.TemporaryDirectory() as d:
        (
            stream.writeStream.format("memory")
            .queryName(stream_output_table)
            .option("checkpointLocation", d)
            .outputMode("append")
            .start()
        )

def query_streaming_table(stream_output_table):
    stream_data = spark.sql("SELECT * FROM " + stream_output_table + " LIMIT 10")
    stream_data.show()
stream_schema = StructType([
    StructField('user_id', StringType(), False),
    StructField('transaction_id', StringType(), False),
    StructField('category', StringType(), False),
    StructField('amount', DoubleType(), False),
    StructField('is_fraud', LongType(), False),
    StructField('merchant', StringType(), False),
    StructField('merchant_lat', DoubleType(), False),
    StructField('merchant_long', DoubleType(), False),
    StructField('timestamp', StringType(), False),
])

binary_stream = spark.readStream \
    .format("kinesis") \
    .option("streamName", "<stream name>") \
    .option("region", "<region>") \
    .option("roleArn", "<role ARN>") \
    .option("initialPosition", "earliest") \
    .load()

json_stream = binary_stream \
    .selectExpr("cast (data as STRING) jsonData") \
    .select(from_json("jsonData", stream_schema).alias("s")) \
    .select("s.*")
stream_schema = StructType([
    StructField('user_id', StringType(), False),
    StructField('transaction_id', StringType(), False),
    StructField('category', StringType(), False),
    StructField('amount', DoubleType(), False),
    StructField('is_fraud', LongType(), False),
    StructField('merchant', StringType(), False),
    StructField('merchant_lat', DoubleType(), False),
    StructField('merchant_long', DoubleType(), False),
    StructField('timestamp', StringType(), False),
])

binary_stream = spark.readStream \
    .format("kinesis") \
    .option("streamName", "<stream name>") \
    .option("awsSTSRoleARN", "<role ARN>") \
    .option("awsSTSSessionName","tecton-materialization") \
    .option("startingPosition", "earliest") \
    .option("kinesis.client.describeShardInterval", "30s") \
    .option("endpointUrl","https://<region>.amazonaws.com") \
    .load()

json_stream = binary_stream \
    .selectExpr("cast (data as STRING) jsonData") \
    .select(from_json("jsonData", stream_schema).alias("s")) \
    .select("s.*")
stream_schema = StructType([
    StructField('user_id', StringType(), False),
    StructField('transaction_id', StringType(), False),
    StructField('category', StringType(), False),
    StructField('amount', DoubleType(), False),
    StructField('is_fraud', LongType(), False),
    StructField('merchant', StringType(), False),
    StructField('merchant_lat', DoubleType(), False),
    StructField('merchant_long', DoubleType(), False),
    StructField('timestamp', StringType(), False),
])

binary_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<URL(s) of Kafka broker(s)>") \
    .option("subscribe", "<Kafka topic name>") \
    .option("startingOffsets", "earliest") \
    # Additional options needed depending on your Kafka connectivity method,
    # such as TLS or SASL. For example, if connecting using TLS, specify
    # kafka.security.protocol (with the value "SSL"), kafka.ssl.keystore.password,
    # kafka.ssl.truststore.location, and kafka.ssl.keystore.location.
    .load()

json_stream = binary_stream \
    .selectExpr("cast (value as STRING) jsonData") \
    .select(from_json("jsonData", stream_schema).alias("s")) \
    .select("s.*")

Write data to a table as it is read from the stream:

Note

The following command should only be run for a short period of time. The command will continuously read data directly from the stream.

write_streaming_data_to_table(json_stream,"stream_output_table_json")

Query the data in the table and display the output:

query_streaming_table("stream_output_table_json")

If no data is returned after running the previous command, run the command again after a short period of time.

Map the fields of the stream source to the batch source

The post processor function referenced in the post_processor parameter of a KafkaConfig or KinesisConfig object is a function that maps the fields of the stream source to the batch source. Following is a description of how the function works.

  • The function accepts a Spark DataFrame as input. The input contains one row, with one column, which is a JSON string.

    • If Kinesis is being used, the name of the column is data.
    • If Kafka is being used, the name of the column is value.
  • Using from_json(), the function converts the column containing the JSON input to a DataFrame, which will eventually be output of the function. Each column of this DataFrame is:

    • Casted to the data type of the corresponding column in the batch source
    • Renamed to match the name of the corresponding column in the batch source
  • The function returns the resulting DataFrame from the previous step.

Example mapping

The following table shows an example mapping of stream source columns to batch source columns, as well as the data type of each batch source column. Note that some of the stream column and batch column names are different.

Stream column name Batch column name Batch column data type
user_id user_id string
transaction_id transaction_id string
category category string
amount amt double
is_fraud is_fraud long
merchant merchant string
merchant_latitude merch_lat double
merchant_longitude merch_long double
timestamp timestamp string

Write the stream message post processor function

The following function implements the mapping that is shown in the table above.

def translate_stream(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType

    stream_schema = StructType([
        StructField('user_id', StringType(), False),
        StructField('transaction_id', StringType(), False),
        StructField('category', StringType(), False),
        StructField('amount', StringType(), False),
        StructField('is_fraud', StringType(), False),
        StructField('merchant', StringType(), False),
        StructField('merchant_latitude', StringType(), False),
        StructField('merchant_longitude', StringType(), False),
        StructField('timestamp', StringType(), False),
    ])

    return (
        df.selectExpr('cast (data as STRING) jsonData')
        .select(from_json('jsonData', stream_schema).alias('payload'))
        .select(
            col('payload.user_id').alias('user_id'),
            col('payload.transaction_id').alias('transaction_id'),
            col('payload.category').alias('category'),
            col('payload.amount').cast('double').alias('amt'),
            col('payload.is_fraud').cast('long').alias('is_fraud'),
            col('payload.merchant').alias('merchant'),
            col('payload.merchant_latitude').cast('double').alias('merch_lat'),
            col('payload.merchant_longitude').cast('double').alias('merch_long'),
            from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
        )
    )
def translate_stream(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType

    stream_schema = StructType([
        StructField('user_id', StringType(), False),
        StructField('transaction_id', StringType(), False),
        StructField('category', StringType(), False),
        StructField('amount', StringType(), False),
        StructField('is_fraud', StringType(), False),
        StructField('merchant', StringType(), False),
        StructField('merchant_latitude', StringType(), False),
        StructField('merchant_longitude', StringType(), False),
        StructField('timestamp', StringType(), False),
    ])

    return (
        df.selectExpr('cast (value as STRING) jsonData')
        .select(from_json('jsonData', stream_schema).alias('payload'))
        .select(
            col('payload.user_id').alias('user_id'),
            col('payload.transaction_id').alias('transaction_id'),
            col('payload.category').alias('category'),
            col('payload.amount').cast('double').alias('amt'),
            col('payload.is_fraud').cast('long').alias('is_fraud'),
            col('payload.merchant').alias('merchant'),
            col('payload.merchant_latitude').cast('double').alias('merch_lat'),
            col('payload.merchant_longitude').cast('double').alias('merch_long'),
            from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
        )
    )

Verify that the batch source and translated stream source are compatible

The schema of the batch source must be a superset of the schema of the translated stream source. The following function performs this check.

def check_batch_stream_schema_compatibility(batch_data, translated_stream_data):
    batch_set = set(batch_data.columns)
    stream_set = set(translated_stream_data.columns)

    if stream_set.issubset(batch_set):
        print("Success! Schemas are aligned.")
        print("\nThe following batch source columns are unused in the stream:")
        print(batch_set.difference(stream_set))
    else:
        print("Error: Columns from the translated stream source are not present in the batch source:")
        print(stream_set.difference(batch_set))
        print("\nStream Source Columns:")
        print(translated_stream_data.columns)
        print("\nBatch Source Columns:")
        print(batch_data.columns)

check_batch_stream_schema_compatibility(batch_data, translate_stream(binary_stream))

Run the post processor function

Run the post processor function to verify that the columns in the stream source are mapped to the batch source, as expected.

translated_stream = translate_stream(binary_stream)

Write data to a table as it is read from the stream:

Note

The following command should only be run for a short period of time. The command will continuously read data directly from the stream.

write_streaming_data_to_table(translated_stream,"translated_stream_output_table")

Query the data in the table and display the output:

query_streaming_table("translated_stream_output_table")

If no data is returned after running the previous command, run the command again after a short period of time.

Create the StreamSource

Using the configurations you used earlier to connect to the batch source and stream source, as well as the stream post processor function you defined earlier, define the StreamSource using the following steps.

  1. Define a config object, such as HiveConfig that is specific to the type of your batch source.

  2. Define a KinesisConfig or KafkaConfig object for your stream source. As part of the object definition, set post_processor to the name of the streaming message post processor function that you wrote earlier.

    Find the complete list of data source configuration classes in API Reference.

  3. Define the StreamSource object, where you set batch_config and stream_config to the names of the config objects you defined in steps 1 and 2.

Write the StreamSource definition

from tecton import HiveConfig, KinesisConfig, StreamSource, BatchSource, DatetimePartitionColumn
from datetime import timedelta

def transactions_stream_translator(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType

    stream_schema = StructType([
        StructField('user_id', StringType(), False),
        StructField('transaction_id', StringType(), False),
        StructField('category', StringType(), False),
        StructField('amount', StringType(), False),
        StructField('is_fraud', StringType(), False),
        StructField('merchant', StringType(), False),
        StructField('merchant_latitude', StringType(), False),
        StructField('merchant_longitude', StringType(), False),
        StructField('timestamp', StringType(), False),
    ])

    return (
        df.selectExpr('cast (data as STRING) jsonData')
        .select(from_json('jsonData', stream_schema).alias('payload'))
        .select(
            col('payload.user_id').alias('user_id'),
            col('payload.transaction_id').alias('transaction_id'),
            col('payload.category').alias('category'),
            col('payload.amount').cast('double').alias('amt'),
            col('payload.is_fraud').cast('long').alias('is_fraud'),
            col('payload.merchant').alias('merchant'),
            col('payload.merchant_latitude').cast('double').alias('merch_lat'),
            col('payload.merchant_longitude').cast('double').alias('merch_long'),
            from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
        )
    )

partition_columns = [
    DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]

batch_config = HiveConfig(
    database='demo_fraud_v2',
    table='transactions',
    timestamp_field='timestamp',
    datetime_partition_columns=partition_columns,
)

stream_config = KinesisConfig(
    stream_name='<stream name>',
    region='<region>',
    initial_stream_position='earliest',
    timestamp_field='timestamp',
    post_processor=transactions_stream_translator,
    options={'roleArn': '<role ARN>'}
)

transactions_stream = StreamSource(
    name='transactions_stream',
    stream_config=stream_config,
    batch_config=batch_config,
    owner='david@tecton.ai',
    tags={'release': 'production'}
)
from tecton import HiveConfig, KinesisConfig, StreamSource, BatchSource, DatetimePartitionColumn
from datetime import timedelta

def transactions_stream_translator(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType

    stream_schema = StructType([
        StructField('user_id', StringType(), False),
        StructField('transaction_id', StringType(), False),
        StructField('category', StringType(), False),
        StructField('amount', StringType(), False),
        StructField('is_fraud', StringType(), False),
        StructField('merchant', StringType(), False),
        StructField('merchant_latitude', StringType(), False),
        StructField('merchant_longitude', StringType(), False),
        StructField('timestamp', StringType(), False),
    ])

    return (
        df.selectExpr('cast (data as STRING) jsonData')
        .select(from_json('jsonData', stream_schema).alias('payload'))
        .select(
            col('payload.user_id').alias('user_id'),
            col('payload.transaction_id').alias('transaction_id'),
            col('payload.category').alias('category'),
            col('payload.amount').cast('double').alias('amt'),
            col('payload.is_fraud').cast('long').alias('is_fraud'),
            col('payload.merchant').alias('merchant'),
            col('payload.merchant_latitude').cast('double').alias('merch_lat'),
            col('payload.merchant_longitude').cast('double').alias('merch_long'),
            from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
        )
    )

partition_columns = [
    DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]

batch_config = HiveConfig(
    database='demo_fraud_v2',
    table='transactions',
    timestamp_field='timestamp',
    datetime_partition_columns=partition_columns,
)

stream_config = KinesisConfig(
    stream_name='<stream name>',
    region='<region name>',
    initial_stream_position='earliest',
    timestamp_field='timestamp',
    post_processor=transactions_stream_translator,
    options={'awsSTSRoleARN': '<role ARN>'}
)

transactions_stream = StreamSource(
    name='transactions_stream',
    stream_config=stream_config,
    batch_config=batch_config,
    owner='david@tecton.ai',
    tags={'release': 'production'}
)

Note

In the KafkaConfig definition below, additional parameters are needed depending on your Kafka connectivity method. See Connecting Kafka Streams for more details.

from tecton import HiveConfig, KafkaConfig, StreamSource, BatchSource, DatetimePartitionColumn
from datetime import timedelta
import os

def translate_stream(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType

    stream_schema = StructType([
        StructField('user_id', StringType(), False),
        StructField('transaction_id', StringType(), False),
        StructField('category', StringType(), False),
        StructField('amount', StringType(), False),
        StructField('is_fraud', StringType(), False),
        StructField('merchant', StringType(), False),
        StructField('merchant_latitude', StringType(), False),
        StructField('merchant_longitude', StringType(), False),
        StructField('timestamp', StringType(), False),
    ])

    return (
        df.selectExpr('cast (value as STRING) jsonData')
        .select(from_json('jsonData', stream_schema).alias('payload'))
        .select(
            col('payload.user_id').alias('user_id'),
            col('payload.transaction_id').alias('transaction_id'),
            col('payload.category').alias('category'),
            col('payload.amount').cast('double').alias('amt'),
            col('payload.is_fraud').cast('long').alias('is_fraud'),
            col('payload.merchant').alias('merchant'),
            col('payload.merchant_latitude').cast('double').alias('merch_lat'),
            col('payload.merchant_longitude').cast('double').alias('merch_long'),
            from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')
        )
    )

partition_columns = [
    DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]

batch_config = HiveConfig(
    database='demo_fraud_v2',
    table='transactions',
    timestamp_field='timestamp',
    datetime_partition_columns=partition_columns,
)

stream_config = KafkaConfig(
    kafka_bootstrap_servers="<URL(s) of Kafka broker(s)>",
    topics="<topic name>",
    post_processor=translate_stream,
    timestamp_field="timestamp",
    )

transactions_stream = StreamSource(
    name='transactions_stream',
    stream_config=stream_config,
    batch_config=batch_config
)

Test the StreamSource

Get the Tecton workspace where your stream source is defined.

ws = tecton.get_workspace("<workspace name>")

Get the stream source:

data_source = ws.get_data_source("transactions_stream")

Verify that Tecton can connect to and read data from the batch source

Set the start and end times that you will use to filter records from the batch source.

end = datetime.now()
start = end - timedelta(days=30)

Call the get_dataframe method of data_source to get data from the batch source, filtered by start and end:

batch_data_from_tecton = data_source.get_dataframe(start_time=start, end_time=end).to_spark().limit(10)
batch_data_from_tecton.show()

Note that although data_source points to a stream source, data_source.get_dataframe() generates feature values from the batch source.

Verify that Tecton can connect to and read data from stream source

Call the start_stream_preview method on data_source to write incoming records from the data source to the TEMP_TABLE_TRANSLATED table. Set apply_translator=True to run the post processor function.

Note

The following command should only be run for a short period of time. The command will continuously read data from the stream source.

data_source.start_stream_preview(table_name="TEMP_TABLE_TRANSLATED", apply_translator=True, option_overrides={'initialPosition': 'earliest'})

Query the data in the table and display the output:

spark.sql("SELECT * FROM TEMP_TABLE_TRANSLATED LIMIT 10").show()

If no data is returned after running the previous command, run the command again after a short period of time.

Create a stream feature view that uses the stream source

from datetime import datetime
from datetime import timedelta

from entities import user
from transactions import transactions_stream

from tecton import stream_feature_view, FilteredSource

@stream_feature_view(
    source=FilteredSource(source=transactions_stream),
    entities=[user],
    mode="spark_sql",
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 20),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=30)
)
def stream_features(transactions_stream):
    return f"""
        SELECT
            user_id,
            transaction_id,
            category,
            amt,
            is_fraud,
            merchant,
            merch_lat,
            merch_long,
            timestamp
        FROM
            {transactions_stream}
        """

Test the stream feature view

Get the stream feature view from the workspace:

fv = ws.get_feature_view('stream_features')

Test the batch source with the stream feature view

Set the start and end times for which you will use to generate feature values.

end = datetime.now()
start = end - timedelta(days=30)

Call the run method of the feature view to get feature data for the timestamp range of start to end, and display the generated feature values.

offline_features = fv.run(start_time=start, end_time=end).to_spark().limit(10)
offline_features.show()

Test the stream source with the stream feature view

Call the run_stream method on fv to write incoming records from the data source to the TEMP_TABLE table.

Note

The following command should only be run for a short period of time. The command will continuously read data from the stream source.

fv.run_stream(output_temp_table="TEMP_TABLE")

Query the data in the table and display the output:

spark.sql("SELECT * FROM TEMP_TABLE LIMIT 10").show()

If no data is returned after running the previous command, run the command again after a short period of time.

Note

Once you have successfully tested your stream source with a streaming feature view, your stream source is ready for use. For information on how a streaming feature view works, and how to define one, see Stream Feature View.