pyarrow_batch_config
Summary​
Declare atecton.PyArrowBatchConfig for configuring a Batch Source with a Data Source Function.
The function takes in an optional tecton.FilterContext, if supports_time_filtering=True.Â
Example defining a Data Source Function using
pyarrow_batch_config:Parameters
data_delay: Optional[datetime.timedelta] = 0:00:00By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has abatch_scheduleof 1 day and one of the data source inputs hasdata_delay=timedelta(hours=1)set, then incremental materialization jobs will run at01:00UTC.supports_time_filtering: Optional[bool] = FalseWhen set toTrue, the Data Source Function must take thefilter_contextparameter and implement time filtering logic.supports_time_filteringmust be set toTrueif<data source>.get_dataframe()is called withstart_timeorend_time, or if using time filtering with a Data Source when defining aFeatureViewwhich has time filtering enabled by default. To use a Data Source without time filtering, call.unfiltered()on the Data Source.. TheFeatureViewwill call the Data Source Function with thetecton.FilterContext, which has thestart_timeandend_timeset.secrets: Optional[Dict[str,Union[Secret, str]]] = NoneA dictionary of Secret references that will be resolved and provided to the Data Source Function at runtime. During local development and testing, strings may be used instead Secret references.
Returns
Returns apyarrow.Table or pyarrow.RecordBatchReader.Example
from tecton import pyarrow_batch_config, Secret@pyarrow_batch_config(supports_time_filtering=True,secrets={"s3_bucket": Secret(scope="dev", key="user_data_s3_bucket")})def parquet_data_source_function(filter_context, secrets):import pyarrow.parquet as pqfrom pyarrow.fs import S3FileSystemfilters=None# Handle time filtering, ideally using partition keysif filter_context:filters = []if filter_context.start_time:filters.append(("created_at", ">=", filter_context.start_time.replace(tzinfo=None)))if filter_context.end_time:filters.append(("created_at", "<", filter_context.end_time.replace(tzinfo=None)))s3_bucket = secrets["s3_bucket"]dataset = pq.ParquetDataset(f"s3://{s3_bucket}/path/to/data.pq",filesystem=S3FileSystem(),use_legacy_dataset=False,filters=filters if len(filters) > 0 else None)return dataset.read()