Skip to main content
Version: 0.4

tecton apply Runs Slowly for FileConfigs with Partitioned S3 Data

Scope

Configuring a BatchSource for partitioned S3 data configured using a FileConfig. This applies to Parquet, CSV, or JSON files.

Cause

The likely cause of tecton apply running slowly is Spark’s mechanism for partition discovery, which makes it slow when inferring schemas for S3 prefixes with many partitions. Every time a new data source is added, tecton apply needs to invoke Spark to obtain the schema from a DataFrame.

For a partitioned data source, Spark will recurse through every partition nested beneath the S3 URI you provide. Read more about Spark partition discovery.

For example, suppose you have some parquet data in S3:

from tecton import FileConfig, BatchSource

transactions_ds = FileConfig(uri="s3://my-data/transactions/", file_format="parquet", timestamp_field="timestamp")


transactions_batch = BatchSource(name="transactions_batch", batch_config=transactions_ds)

For the data source above, Spark will recursively do a list operation on every partition under s3://my-data/transactions/. If, for example, the data is partitioned by year, month, and day, this means 2 years of data will require Spark do a list operation on 2 years x 12 months x approx 30 days = approx 720 partitions. If the data is also partitioned by hour, this increases to 720 x 24 = approx 17280 partitions.

You can verify this slow behavior by simply running spark.read followed by printSchema() in a notebook:

spark.read.parquet("s3://my-data/transactions/").printSchema()

Try doing this for the S3 prefix you’re finding slow. Be sure to replace spark.read.parquet with spark.read.json or spark.read.csv as needed.

Resolution

Option 1: Use an AWS Glue catalog table

Connect your Tecton instance to AWS Glue and create a Glue table that points to your data in S3. Since Spark is reading the data through a metastore, the schema will be instantly available to Spark without recursing through data in S3.

This option is recommended for any data that is depended upon in production use cases since Glue enforces a strong schema contract.

Option 2: Decrease the granularity of partitions

Do you need a high granularity of partitions for the volume of your data? If the total data volume is low, consider using fewer partitions. Fewer partitions will make all spark.read operations faster since there will be fewer directories to recurse through.

Option 3: Specify schema_uri to use a single file for schema inference

FileConfig supports a schema_uri field, which allows you to point to a single file within your S3 data that will be used as the source of truth for its schema. For example, suppose your data is located in s3://my-data/transactions/ and it is partitioned by year, month, and day (e.g. s3://my-data/transactions/year=2022/month=11/day=23/).

Specifying a single file with schema_uri will cause Spark to use it to infer the data schema.

from tecton import FileConfig, BatchSource

SCHEMA_FILE_URI = "s3://my-data/transactions/year=2022/month=11/day=23/67d652.parquet"

transactions_ds = FileConfig(
uri="s3://my-data/transactions/",
schema_uri=SCHEMA_FILE_URI,
file_format="parquet",
timestamp_field="timestamp",
)


transactions_batch = BatchSource(name="transactions_batch", batch_config=transactions_ds)

Internally, this causes Spark to infer schema using the “basePath” option which tells Spark to treat the partition subpaths between “basePath” and the specified URI as schema columns.

spark.read \
.option("basePath", s3://my-data/transactions/") \
.parquet("s3://my-data/transactions/year=2022/month=11/day=23/67d652.parquet")

Was this page helpful?

Happy React is loading...