FileConfig
Summary​
Configuration used to reference a file or directory (S3, etc.)Â
The FileConfig class is used to create a reference to a file or directory of files in S3, HDFS, or DBFS.
Â
The schema of the data source is inferred from the underlying file(s). It can also be modified using the
post_processor parameter.Â
This class is used as an input to a
DataSource's parameter batch_config. Declaring this configuration class alone
will not register a Data Source. Instead, declare as a part of BatchSource that takes this configuration class
instance as a parameter.
important
If your files are partitioned, simply provide the path to the root folder. For
example: uri = "s3://<bucket-name>/<root-folder>/"
Tecton will use Spark partition discovery to find all partitions and infer the schema.
When reading a highly-partitioned file, Tecton recommends setting the
schema_uri parameter to speed up schema inference. For more details, review
our documentation
here.
Attributes​
| Name | Data Type | Description |
|---|---|---|
data_delay | timedelta | Returns the duration that materialization jobs wait after the batch_schedule before starting, typically to ensure that all data has landed. |
Methods​
| Name | Description |
|---|---|
__init__(...) | Instantiates a new FileConfig. |
__init__(...)​
Instantiates a new FileConfig.Â
Example of a FileConfig declaration:
Parameters
uri: strS3 or HDFS path to file(s).file_format: strFile format. "json", "parquet", or "csv"timestamp_field: Optional[str] = NoneThe timestamp column in this data source that should be used for time-based filtering. Required unless this source is used in Feature Views only withunfiltered().timestamp_format: Optional[str] = NoneFormat of string-encoded timestamp column (e.g. "yyyy-MM-dd'T'hh:mm:ss.SSS'Z'"). If the timestamp string cannot be parsed with this format, Tecton will fallback and attempt to use the default timestamp parser.datetime_partition_columns: Optional[List[DatetimePartitionColumn]] = NoneList of DatetimePartitionColumn the raw data is partitioned by, otherwise None.post_processor: Optional[Callable] = NonePython user defined functionf(DataFrame) -> DataFramethat takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View.schema_uri: Optional[str] = NoneA file or subpath of "uri" that can be used for fast schema inference. This is useful for speeding up plan computation for highly partitioned data sources containing many files.schema_override: Optional[pyspark.sql.types.StructType] = NoneA pyspark.sql.types.StructType object that will be used as the schema when reading from the file. If omitted, the schema will be inferred automatically.data_delay: timedelta = 0:00:00This parameter configures how long materialization jobs wait after the end of the batch schedule period before starting, typically to ensure that all data has landed. For example, if a feature view has abatch_scheduleof 1 day and one of the data source inputs hasdata_delay=timedelta(hours=1)set, then incremental materialization jobs will run at01:00UTC.
Returns
AFileConfig class instance.Example
from tecton import FileConfig, BatchSource# Define a post-processor function to convert the temperature from Celsius to Fahrenheitdef convert_temperature(df):from pyspark.sql.functions import udf,colfrom pyspark.sql.types import DoubleTypeudf_convert = udf(lambda x: x * 1.8 + 32.0, DoubleType())converted_df = df.withColumn("Fahrenheit", udf_convert(col("Temperature"))).drop("Temperature")return converted_df# Declare a FileConfig, which can be used as a parameter to a `BatchSource`ad_impressions_file_config = FileConfig(uri="s3://tecton.ai.public/data/ad_impressions_sample.parquet",file_format="parquet",timestamp_field="timestamp",post_processor=convert_temperature)# This FileConfig can then be included as a parameter for a BatchSource declaration.# For example,ad_impressions_batch = BatchSource(name="ad_impressions_batch",batch_config=ad_impressions_file_config)