Skip to main content
Version: 0.7

tecton.spark_batch_config

Summary​

Declare a tecton.SparkBatchConfig for configuring a Batch Source with a Data Source Function. The function takes in a SparkSession and an optional tecton.FilterContext, if supports_time_filtering=True. Returns a DataFrame.

Example​

Example defining a Data Source Function using spark_batch_config:

from tecton import spark_batch_config


@spark_batch_config(supports_time_filtering=True)
def redshift_data_source_function(spark, filter_context):
spark_format = "com.databricks.spark.redshift"
params = {"user": "<user name>", "password": os.environ["redshift_password"]}
endpoint = "<redshift endpoint>"
full_connection_string = f"jdbc:redshift://{endpoint};user={params['user']};password={params['password']}"
df_reader = (
spark.read.format(spark_format)
.option("url", full_connection_string)
.option("forward_spark_s3_credentials", "true")
)
df_reader = df_reader.option("dbtable", "your_table_name")
df = df_reader_load()
ts_column = "timestamp"
df = df.withColumn(ts_column, col(ts_column).cast("timestamp"))
# Handle time filtering
if filter_context:
if filter_context.start_time:
df = df.where(col(ts_column) >= filter_context.start_time)
if filter_context.end_time:
df = df.where(col(ts_column) < filter_context.end_time)
return df

Parameters​

  • data_delay (Optional[timedelta]) – By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has a batch_schedule of 1 day and one of the data source inputs has data_delay=timedelta(hours=1) set, then incremental materialization jobs will run at 01:00 UTC. (Default: datetime.timedelta(0))

  • supports_time_filtering (bool) – Must be set to to True if one of the following conditions is met:

    • <data source>.get_dataframe() is called with start_time or end_time
    • A feature view wraps this Data Source with a FilteredSource

    If this parameter is set to true, Tecton passes a FilterContext object into the Data Source Function, which is expect to handle its own filtering. (Default: False)

Was this page helpful?