Connect to PostgreSQL
To run feature pipelines based on data in a PostgresSQL Table, Tecton needs to be configured with permissions to read from your Postgres Database. The following guide shows how to connect to your PostgreSQL Database and validate that Tecton is able to connect to your data source.
PostgreSQL support is only available starting in Tecton 1.1.0.
Configure the Data Source​
Below are examples on how to define a Batch Source that reads data in from a
PostgreSQL Database. To configure your data source, define either a
pyarrow_batch_config
or pandas_batch_config
. For the best performance,
Tecton recommends using Arrow and the pyarrow_batch_config
when reading data
from your database.
Using pyarrow_batch_config​
The following example shows how to use the pyarrow_batch_config
and the
Arrow PostgreSQL driver
to read data from a PostgreSQ database.
import tecton
from tecton import pyarrow_batch_config
# Declare a pyarrow_batch_config that connects to your PostgresSQL database.
@pyarrow_batch_config(
secrets={
"user": Secret(scope="postgres", key="postgres-user"),
"password": Secret(scope="postgres", key="postgres-password"),
},
supports_time_filtering=True,
)
def postgres_config(secrets, filter_context):
import adbc_driver_postgresql.dbapi
# Define your PostgreSQL connection
user = secrets["postgres-user"]
password = secrets["postgres-password"]
host = "<your host"
database_name = "<db name>"
uri = f"postgresql://{user}:{password}@{host}/{database_name}"
# Define your SQL query
query = "SELECT * FROM <table name>"
# Filter by timestamp column
if filter_context:
filters = []
if filter_context.start_time:
filters.append(f" timestamp_column >= {filter_context.start_time.replace(tzinfo=None)}::timestamp")
if filter_context.end_time:
filters.append(f" timestamp_column < {filter_context.end_time.replace(tzinfo=None)}::timestamp")
if filters:
query += " WHERE" + " AND".join(filters)
# Execute query and return pyarrow Table
conn = adbc_driver_postgresql.dbapi.connect(uri)
with adbc_driver_postgresql.dbapi.connect(uri) as conn, conn.cursor() as curr:
curr.execute(query)
table = curr.fetch_arrow_table()
return table
# Use in the BatchSource
postgres_ds = BatchSource(name="postgres_ds", batch_config=postgres_config)
# Test data source connection
postgres_ds.get_dataframe().to_pandas().head(10)
Use Tecton Secrets to pass in the user and password for the PostgreSQL connection, or any other credentials to authenticate.
In addition to Arrow PostgreSQL driver, connector-x and turbodbc are other library options for connecting to PostgreSQL and reading data in with Arrow.
Using pandas_batch_config​
The following example shows how to use the
pandas_batch_config
and the psycopg library to read data
from a PostgreSQL database.
import tecton
from tecton import pyarrow_batch_config
# Declare a pyarrow_batch_config that connects to your PostgresSQL database.
@pandas_batch_config(
secrets={
"user": Secret(scope="postgres", key="postgres-user"),
"password": Secret(scope="postgres", key="postgres-password"),
},
supports_time_filtering=True,
)
def postgres_config(secrets, filter_context):
import pandas as pd
import psycopg
# Define your connection parameters
user = secrets["postgres-user"]
password = secrets["postgres-password"]
host = "<your host"
database_name = "<db name>"
uri = f"postgresql://{user}:{password}@{host}/{database_name}"
# Define your SQL query
query = "SELECT * FROM your_table"
# Filter by timestamp column
if filter_context:
filters = []
if filter_context.start_time:
filters.append(f" timestamp_column >= {filter_context.start_time.replace(tzinfo=None)}::timestamp")
if filter_context.end_time:
filters.append(f" timestamp_column < {filter_context.end_time.replace(tzinfo=None)}::timestamp")
if filters:
query += " WHERE" + " AND".join(filters)
with psycopg.connect(uri) as conn:
# Use pandas to execute the query and return a pandas DataFrame
df = pd.read_sql(query, connection)
return df
# Use in the BatchSource
postgres_ds = BatchSource(name="postgres_ds", batch_config=postgres_config)
# Test data source connection
postgres_ds.get_dataframe().to_pandas().head(10)
Configure the Materialization Environment​
psycopg is supported by
default in Tecton core environments >=1.1.0, such astecton-rift-core-1.1.0
. To
use other libraries (such as those mentioned in this doc), please create a
custom environment
containing the PostgreSQL connection library of your choice.