KafkaConfig
Summary​
The KafkaConfig class is used to create a reference to a Kafka stream.Â
This class is used as an input to a
StreamSource
's parameter stream_config
. Declaring this configuration
class alone will not register a Data Source. Instead, declare as a part of StreamSource
that takes this configuration
class instance as a parameter.Example
import datetimeimport pysparkfrom tecton import KafkaConfigdef raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:from pyspark.sql.functions import from_json, colfrom pyspark.sql.types import StringType, TimestampTypePAYLOAD_SCHEMA = StructType().add("accountId", StringType(), False).add("transaction_id", StringType(), False)EVENT_ENVELOPE_SCHEMA = StructType().add("timestamp", TimestampType(), False).add("payload", PAYLOAD_SCHEMA, False)value = col("value").cast("string")df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))df = df.withColumn("accountId", col("event.payload.accountId"))df = df.withColumn("transaction_id", col("event.payload.transaction_id"))df = df.withColumn("timestamp", col("event.timestamp"))return df# Declare Kafka Config instance object that can be used as an argument in StreamSourceclick_stream_kafka_ds = KafkaConfig(kafka_bootstrap_servers="127.0.0.1:12345",topics="click-events-json",timestamp_field="click_event_timestamp",post_processor=raw_data_deserialization,)
Methods​
Name | Description |
---|---|
__init__(...) | Instantiates a new KafkaConfig. |
__init__(...)​
Instantiates a new KafkaConfig.Parameters
kafka_bootstrap_servers
(str
) - A comma-separated list of the Kafka bootstrap server addresses. Passed directly to the Sparkkafka.bootstrap.servers
option.topics
(str
) - A comma-separated list of Kafka topics to subscribe to. Passed directly to the Sparksubscribe
option.post_processor
() - Python user defined functionf(DataFrame) -> DataFrame
that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of post_processor in the User Guide.timestamp_field
(str
) - Name of the column containing timestamp for watermarking.watermark_delay_threshold
(timedelta
) - Watermark time interval, e.g: timedelta(hours=36), used by Spark Structured Streaming to account for late-arriving data. See: Productionizing a Stream. Default:24h
options
(Optional
[Dict
[str
,str
]]) - A map of additional Spark readStream options Default:None
ssl_keystore_location
(Optional
[str
]) - An DBFS (Databricks only) or S3 URI that points to the keystore file that should be used for SSL brokers. Note for S3 URIs, this must be configured by your Tecton representative. Example:s3://tecton-<deployment name>/kafka-credentials/kafka_client_keystore.jks
Example:dbfs:/kafka-credentials/kafka_client_keystore.jks
Default:None
ssl_keystore_password_secret_id
(Optional
[str
]) - The config key for the password for the Keystore. Should start withSECRET_
, example:SECRET_KAFKA_PRODUCTION
. Default:None
ssl_truststore_location
(Optional
[str
]) - An DBFS (Databricks only) or S3 URI that points to the truststore file that should be used for SSL brokers. Note for S3 URIs, this must be configured by your Tecton representative. If not provided, the default truststore for your compute provider will be used. Note that this is required for AWS-signed keystores on Databricks. Example:s3://tecton-<deployment name>/kafka-credentials/kafka_client_truststore.jks
Example:dbfs:/kafka-credentials/kafka_client_truststore.jks
Default:None
ssl_truststore_password_secret_id
(Optional
[str
]) - The config key for the password for the Truststore. Should start withSECRET_
, example:SECRET_KAFKA_PRODUCTION
. Default:None
security_protocol
(Optional
[str
]) - Security protocol passed to kafka.security.protocol. See Kafka documentation for valid values. Default:None
deduplication_columns
(Optional
[List
[str
]]) - Columns in the stream data that uniquely identify data records. Used for de-duplicating. Spark will drop rows if there are duplicates in the deduplication_columns, but only within the watermark delay window. Default:None