HiveConfig
Summary​
The HiveConfig class is used to create a reference to a Hive Table.Â
This class is used as an input to a
BatchSource's parameter batch_config. Declaring this configuration
class alone will not register a Data Source. Instead, declare as a part of BatchSource that takes this configuration
class instance as a parameter.
Attributes​
data_delay: This attribute is the same as thedata_delayparameter of the__init__method. See below.
Methods​
__init__(...)​
Instantiates a new HiveConfig.Â
Example of a HiveConfig declaration:
Parameters
table: strA table registered in Hive MetaStore.database: strA database registered in Hive MetaStore.timestamp_field: Optional[str] = NoneThe timestamp column in this data source that should be used for time-based filtering. Required unless this source is used in Feature Views only withunfiltered().timestamp_format: Optional[str] = NoneFormat of string-encoded timestamp column (e.g. "yyyy-MM-dd'T'hh:mm:ss.SSS'Z'"). If the timestamp string cannot be parsed with this format, Tecton will fallback and attempt to use the default timestamp parser.datetime_partition_columns: Optional[List[DatetimePartitionColumn]] = NoneList of DatetimePartitionColumn the raw data is partitioned by, otherwise None.post_processor: Optional[Callable] = NonePython user defined functionf(DataFrame) -> DataFramethat takes in raw PySpark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View.data_delay: timedelta = 0:00:00By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has abatch_scheduleof 1 day and one of the data source inputs hasdata_delay=timedelta(hours=1)set, then incremental materialization jobs will run at01:00UTC.
Returns
A HiveConfig class instance.Example
from tecton import HiveConfigimport pysparkdef convert_temperature(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:from pyspark.sql.functions import udf,colfrom pyspark.sql.types import DoubleType# Convert the incoming PySpark DataFrame temperature Celsius to Fahrenheitudf_convert = udf(lambda x: x * 1.8 + 32.0, DoubleType())converted_df = df.withColumn("Fahrenheit", udf_convert(col("Temperature"))).drop("Temperature")return converted_df# declare a HiveConfig instance, which can be used as a parameter to a BatchSourcebatch_config=HiveConfig(database='global_temperatures',table='us_cities',timestamp_field='timestamp',post_processor=convert_temperature)