Skip to main content
Version: Beta 🚧

pandas_batch_config

Summary​

Declare a tecton.PandasBatchConfig for configuring a Batch Source with a Data Source Function. The function takes in an optional tecton.FilterContext, if supports_time_filtering=True.

Parameters

  • data_delay (Optional[datetime.timedelta]) - By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has a batch_schedule of 1 day and one of the data source inputs has data_delay=timedelta(hours=1) set, then incremental materialization jobs will run at 01:00 UTC. Default: 0:00:00

  • supports_time_filtering (Optional[bool]) - When set to True, the Data Source Function must take the filter_context parameter and implement time filtering logic. supports_time_filtering must be set to True if <data source>.get_dataframe() is called with start_timeor end_time. supports_time_filtering must also be set to True if using tecton.declarative.FilteredSource with a Data Source when defining a FeatureView. The FeatureView will call the Data Source Function with the tecton.FilterContext, which has the start_time and end_time set. Default: false

  • secrets (Union[Dict[str,Union[Secret, str]], NoneType]) - A dictionary of Secret references that will be resolved and provided to the Data Source Function at runtime. During local development and testing, strings may be used instead Secret references. Default: None

Returns

Returns a pandas.DataFrame.

Example

from tecton import pandas_batch_config, Secret
@pandas_batch_config(
supports_time_filtering=True,
secrets={"s3_bucket": Secret(scope="dev", key="user_data_s3_bucket")}
)
def parquet_data_source_function(filter_context, secrets):
import pyarrow.parquet as pq
from pyarrow.fs import S3FileSystem
filters=None
# Handle time filtering, ideally using partition keys
if filter_context:
filters = []
if filter_context.start_time:
filters.append(("created_at", ">=", filter_context.start_time.replace(tzinfo=None)))
if filter_context.end_time:
filters.append(("created_at", "<", filter_context.end_time.replace(tzinfo=None)))
s3_bucket = secrets["s3_bucket"]
dataset = pq.ParquetDataset(
f"s3://{s3_bucket}/path/to/data.pq",
filesystem=S3FileSystem(),
use_legacy_dataset=False,
filters=filters if len(filters) > 0 else None
)
return dataset.read_pandas().to_pandas()

Was this page helpful?