tecton.KafkaDSConfig

class tecton.KafkaDSConfig(kafka_bootstrap_servers, topics, raw_stream_translator, timestamp_key, default_watermark_delay_threshold=None, options=None, ssl_keystore_location=None, ssl_keystore_password_secret_id=None, ssl_truststore_location=None, ssl_truststore_password_secret_id=None, security_protocol=None)

Configuration used to reference a Kafka stream.

The KafkaDSConfig class is used to create a reference to a Kafka stream.

This class used as an input to a StreamDataSource’s parameter stream_ds_config. This class is not a Tecton Object: it is a grouping of parameters. Declaring this class alone will not register a data source. Instead, declare as part of StreamDataSource that takes this configuration class instance as a parameter.

Methods

__init__

Instantiates a new KafkaDSConfig.

__init__(kafka_bootstrap_servers, topics, raw_stream_translator, timestamp_key, default_watermark_delay_threshold=None, options=None, ssl_keystore_location=None, ssl_keystore_password_secret_id=None, ssl_truststore_location=None, ssl_truststore_password_secret_id=None, security_protocol=None)

Instantiates a new KafkaDSConfig.

Parameters
  • kafka_bootstrap_servers (str) – A comma-separated list of the Kafka bootstrap server addresses. Passed directly to the Spark kafka.bootstrap.servers option.

  • topics (str) – A comma-separated list of Kafka topics to subscribe to. Passed directly to the Spark subscribe option.

  • raw_stream_translator – Python user defined function f(DataFrame) -> DataFrame that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of raw_stream_translator in the User Guide.

  • timestamp_key (str) – Name of the column containing timestamp for watermarking.

  • default_watermark_delay_threshold (Optional[str]) – (Optional) Watermark time interval, e.g: “24 hours”, used by Spark Structured Streaming to account for late-arriving data. See: https://docs.tecton.ai/v2/overviews/framework/feature_views/stream_feature_view.html#productionizing-a-stream

  • options (Optional[Dict[str, str]]) – (Optional) A map of additional Spark readStream options

  • ssl_keystore_location (Optional[str]) – An DBFS (Databricks only) or S3 URI that points to the keystore file that should be used for SSL brokers. Note for S3 URIs, this must be configured by your Tecton representative. Example: s3://tecton-${cluster_name}/kafka-credentials/kafka_client_keystore.jks Example: dbfs:/kafka-credentials/kafka_client_keystore.jks

  • ssl_keystore_password_secret_id (Optional[str]) – The config key for the password for the Keystore. Should start with SECRET_, example: SECRET_KAFKA_PRODUCTION.

  • ssl_truststore_location (Optional[str]) – An DBFS (Databricks only) or S3 URI that points to the truststore file that should be used for SSL brokers. Note for S3 URIs, this must be configured by your Tecton representative. If not provided, the default truststore for your compute provider will be used. Note that this is required for AWS-signed keystores on Databricks. Example: s3://tecton-${cluster_name}/kafka-credentials/kafka_client_truststore.jks Example: dbfs:/kafka-credentials/kafka_client_truststore.jks

  • (Optional) (ssl_truststore_password_secret_id) – The config key for the password for the Truststore. Should start with SECRET_, example: SECRET_KAFKA_PRODUCTION.

  • security_protocol (Optional[str]) – (Optional) Security protocol passed to kafka.security.protocol. See Kafka documentation for valid values.

Returns

A KafkaDSConfig class instance.

Example of a KafkaDSConfig declaration:

import pyspark
from tecton import KafkaDSConfig


def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    from pyspark.sql.functions import from_json, col
    from pyspark.sql.types import StringType, TimestampType

    PAYLOAD_SCHEMA = (
      StructType()
            .add("accountId", StringType(), False)
            .add("transaction_id", StringType(), False)
    )

    EVENT_ENVELOPE_SCHEMA = (
      StructType()
            .add("timestamp", TimestampType(), False)
            .add("payload", PAYLOAD_SCHEMA, False)
    )

    value = col("value").cast("string")
    df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))
    df = df.withColumn("accountId", col("event.payload.accountId"))
    df = df.withColumn("transaction_id", col("event.payload.transaction_id"))
    df = df.withColumn("timestamp", col("event.timestamp"))

    return df

# Declare Kafka DSConfig instance object that can be used as an argument in StreamDataSource
click_stream_kafka_ds = KafkaDSConfig(default_watermark_delay_threshold="7 days",
                            kafka_bootstrap_servers="127.0.0.1:12345",
                            topics="click-events-json",
                            timestamp_key="click_event_timestamp",
                            raw_stream_translator=raw_data_deserialization)