Skip to main content
Version: 0.8

Create and Test a Data Source Using a Data Source Function

Overview​

When defining a BatchSource or StreamSource object, you set the batch_config or stream_config parameter, respectively. The value of these configs can be the name of an object (such as HiveConfig or KafkaConfig) or a Data Source Function.

Compared to using an object, a Data Source Function gives you more flexibility in connecting to an underlying data source and specifying logic for transforming the data retrieved from the underlying data source. However, using an object is recommended if you do not require the additional flexibility offered by a Data Source Function.

Define a BatchSource using a Data Source Function​

To define a BatchSource using a Data Source Function, use the @spark_batch_config decorator.

The inputs to the Data Source Function are a SparkSession and an optional filter_context:

  • The SparkSession is used to connect to your data, run any transformations, and return a Spark DataFrame.

  • When @spark_batch_config decorator is defined with supports_time_filtering=True, the Data Source Function must take the filter_context parameter, as input, and implement time filtering logic. supports_time_filtering must be set to True if:

    • <data source>.get_dataframe() is called with start_time or end_time.
    • FilteredSource is used with a Data Source when defining a Feature View. The Feature View will call the Data Source Function with FilterContext, which has the start_time and end_time set.

A quick self-contained example​

In the following example, the data is statically defined. This is particularly useful for quick iterative feature engineering against fully mocked data in a notebook:

from tecton import spark_batch_config, BatchSource


@spark_batch_config()
def mock_data_source_function(spark):
import pyspark.sql.functions as F

# Mock data
data = [
("Noah", "StarCluster1", "2022-03-15 14:40:00", "2022-03-25 12:49:00"),
("Ella", "StarCluster5", "2022-09-06 00:23:00", "2022-09-14 06:22:00"),
("Ella", "AndromedaZone9", "2022-03-16 08:25:00", "2022-03-27 10:55:00"),
("Liam", "BlackHoleEpsilon2", "2022-11-17 08:21:00", "2022-11-22 06:00:00"),
("Aiden", "SpaceNexus3", "2022-01-16 01:30:00", "2022-01-28 21:55:00"),
("Ava", "StarlightVortex7", "2022-01-10 18:30:00", "2022-01-21 12:05:00"),
("Mia", "SpaceNexus7", "2022-06-01 20:34:00", "2022-06-11 23:08:00"),
]

cols = ["Name", "Home", "SessionStart", "SessionEnd"]

df = (
spark.createDataFrame(data, cols)
.withColumn("SessionStart", F.col("SessionStart").cast("timestamp"))
.withColumn("SessionEnd", F.col("SessionEnd").cast("timestamp"))
)

return df


user_sessions = BatchSource(name="user_sessions", batch_config=mock_data_source_function)

An example Batch Data Source Function that does not use time filtering​

In the following example, csv_data_source_function reads from a .CSV file and returns a DataFrame.

@spark_batch_config()
def csv_data_source_function(spark):
from pyspark.sql.functions import col

ts_column = "created_at"
df = spark.read.csv(csv_uri, header=True)
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
return df


csv_batch_source = BatchSource(name="csv_ds", batch_config=csv_data_source_function)

An example Data Source Function that uses time filtering​

In the following example, redshift_data_source_function is a Data Source Function that is used to connect to a Redshift table. Because supports_time_filtering=True, handling filtering using the filter_context is required.

@spark_batch_config(supports_time_filtering=True)
def redshift_data_source_function(spark, filter_context):
spark_format = "com.databricks.spark.redshift"
params = {"user": "<user name>", "password": os.environ["redshift_password"]}
endpoint = "<redshift endpoint>"
full_connection_string = f"jdbc:redshift://{endpoint};user={params['user']};password={params['password']}"

df_reader = (
spark.read.format(spark_format)
.option("url", full_connection_string)
.option("forward_spark_s3_credentials", "true")
)
df_reader = df_reader.option("dbtable", "<table name>")
df = df_reader_load()

ts_column = "timestamp"
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
# Handle time filtering
if filter_context:
if filter_context.start_time:
df = df.where(col(ts_column) >= filter_context.start_time)
if filter_context.end_time:
df = df.where(col(ts_column) < filter_context.end_time)

return df


redshift_batch_source = BatchSource(name="redshift_ds", batch_config=redshift_data_source_function)

Define a StreamSource using a Data Source Function​

To define a StreamSource using a Data Source Function, use the @spark_stream_config decorator. The Data Source Function takes a SparkSession as input and returns a Spark streaming DataFrame.

In the following example, kinesis_data_source_function connects to a Kinesis stream and returns a streaming DataFrame.

def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)

payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("timestamp", StringType(), False),
]
)

return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)


@spark_stream_config()
def kinesis_data_source_function(spark):
options = {
"streamName": "<stream name>",
"roleArn": "<role ARN>",
"region": "<region>",
"shardFetchInterval": "30s",
"initialPosition": "latest",
}
reader = spark.readStream.format("kinesis").options(**options)
df = reader.load()
df = raw_data_deserialization(df)
watermark = "{} seconds".format(timedelta(hours=24).seconds)
df = df.withWatermark("timestamp", watermark)
return df


transactions_stream = StreamSource(
name="kinesis_ds",
stream_config=kinesis_data_source_function,
batch_config=redshift_data_source_function,
)

Test the Data Source in a notebook​

After you have applied your data source using tecton apply, use the Tecton SDK in a notebook:

import tecton

workspace = tecton.get_workspace("<workspace name>")

# Test the batch data source. In the call to get_dataframe, start_time is specified because the data source redshift_ds is defined using a Data Source Function having supports_time_filtering=True.

my_batch_ds = workspace.get_data_source("redshift_ds")
df = my_batch_ds.get_dataframe(start_time=datetime.datetime(1, 1, 2020)).to_spark()
df.limit(10).show()

# Test the stream data source

my_stream_ds = workspace.get_data_source("kinesis_ds")
my_stream_ds.start_stream_preview("temp_table")
spark.sql("SELECT * FROM temp_table LIMIT 10").show()

Was this page helpful?