Connect to Iceberg
To run feature pipelines based on data in an Iceberg table, Tecton needs to be configured with permissions to your Iceberg tables. The following guide shows how to use PyIceberg to connect to your Iceberg tables and validate that Tecton is able to connect to your data source.
Iceberg support is only available starting in Tecton 1.1.0.
Configure the Data Source​
Tecton recommends using the PyIceberg library to connect to Iceberg tables. The configuration and corresponding recommended authentication scheme depends on where the Iceberg table lives. PyIceberg supports reading from file systems such as S3 and GCS, along with catalogs such as Hive and Glue.
To configure your data source, define a pyarrow_batch_config
as shown below.
import tecton
from tecton import pyarrow_batch_config
# Declare a pyarrow_batch_config that connects to your Iceberg table
@pyarrow_batch_config()
def iceberg_config():
from pyiceberg.catalog import load_catalog
catalog_type = "<your catalog>"
catalog_configurations = {"type": catalog_type} # add any configuration params that apply
table_name = "<your table>"
catalog = load_catalog(catalog_type, **catalog_configurations)
tbl = catalog.load_table(table_name)
return tbl.scan().to_arrow()
# Use in the BatchSource
iceberg_ds = BatchSource(name="iceberg_ds", batch_config=iceberg_config)
# Test data source connection
iceberg_ds.get_dataframe().to_pandas().head(10)
When developing in a local or notebook environment, you may need to additionally
install pyiceberg
Configure the Materialization Environment​
Iceberg is supported by default in Tecton core environments >=1.1.0, such as
tecton-rift-core-1.1.0
. The core environment installs the package
pyiceberg and the necessary dependencies for
Arrow file i/o and Glue catalog support.
If you'd like to use a different library to connect to your Iceberg table or need support for other catalog implementations, please create a custom environment.
Example using a Glue Catalog with Iceberg​
Add permission for Tecton to connect to AWS Glue​
Tecton materialization jobs will need the following permissions to connect to the Glue Catalog:
- Glue Permissions
- glue:GetDatabase
- glue:GetDatabases
- glue:GetTable
- glue:GetTables
- glue:GetPartition
- glue:GetPartitions
- S3 Permissions
- s3:ListBucket
- s3:GetObject
To grant Tecton access to your S3 data source, configure a S3 bucket policy that gives your Tecton Account's AWS role read-only access to the data source, such as the following example.
Contact Tecton Support if you do not know your TECTON_ACCOUNT_ARN
.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "TectonS3DataSourceGet",
"Effect": "Allow",
"Principal": {
"AWS": "{TECTON_ACCOUNT_ARN}"
},
"Action": "s3:GetObject",
"Resource": "{YOUR_S3_DATA_SOURCE_ARN}/*"
},
{
"Sid": "TectonS3DataSourceList",
"Effect": "Allow",
"Principal": {
"AWS": "{TECTON_ACCOUNT_ARN}"
},
"Action": "s3:ListObject",
"Resource": "{YOUR_S3_DATA_SOURCE_ARN}"
}
]
}
To grant Tecton access to your Glue catalog, add the following permissions to the AWS instance profile used during Rift materialization jobs. Contact Tection support if your Rift materialization jobs run in Tecton's account and you want to access Glue in your AWS account.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "GlueAllowTectonAccess",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<YOUR_ACCOUNT_ID>:instance-profile/tecton-rift-compute"
},
"Action": [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:GetPartition",
"glue:GetPartitions",
"glue:GetTable",
"glue:GetTables"
],
"Resource": "arn:aws:glue:<aws-region-target-glue-catalog>:<aws-account-id-target-glue-catalog>:*"
}
]
}
Declare a Batch Source​
Below is an example on how to configure a Tecton data source to read from an Iceberg table backed by an AWS Glue Catalog.
import tecton
from tecton import pyarrow_batch_config
@pyarrow_batch_config(supports_time_filtering=True)
def iceberg_config(filter_context):
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual, LessThan, And
catalog_type = "glue"
catalog_configurations = {"type": catalog_type}
table_name = "my_table"
catalog = load_catalog(catalog_type, **catalog_configurations)
tbl = catalog.load_table(table_name)
filter_expression = None
if filter_context and (filter_context.start_time or filter_context.end_time):
start_filter = None
end_filter = None
if filter_context.start_time:
start_filter = GreaterThanOrEqual("timestamp_column", filter_context.start_time.replace(tzinfo=None))
if filter_context.end_time:
end_filter = LessThan("timestamp_column", filter_context.end_time.replace(tzinfo=None))
filter_expression = And(start_filter, end_filter) if start_filter and end_filter else start_filter or end_filter
scanned_table = tbl.scan(row_filter=filter_expression) if filter_expression else tbl.scan()
return scanned_table.to_arrow()
# Use in the BatchSource
iceberg_ds = BatchSource(name="iceberg_ds", batch_config=iceberg_config)
# Test data source connection
iceberg_ds.get_dataframe().to_pandas().head(10)