Skip to main content
Version: 1.2

Publish Full Features to a Custom Sink

Private Preview

This feature is currently in Private Preview.

Users can write features to a custom storage via a user-defined function.

Define a sink_config to write to storage​

from tecton.vnext import sink_config
from tecton import Secret

@sink_config(
name="postgres_sink", # Defaults to function name
mode="pandas",
secrets={"key":Secret(scope="scopeName", key="keyName")}
)

# user code which runs during the feature publish job
def sink(df, context):

The available modes are: pyspark, pyarrow, and pandas, which correspond to the compute environment used in the deployment. The selected mode determines the data type of the input DataFrame (df).

ModeInput df type
pysparkspark Dataframe
pandaspandas Dataframe
pyarrowRecordBatchReader

For a feature publish job with start time st and end time et, the input DataFrame (df) contains the results of get_features_in_range(st, et) for that time period. The following is an example df:

idTITLEdescription_embedding_valid_from_valid_to
1"Sample Title 1"[0.043933567, 0.058934413, 0.04817833, 0.07754808...]2025-01-02 00:00:00+00:002026-01-01 00:00:00+00:00
2"Sample Title 2"[-0.0057690353, 0.024765287, 0.050806455, 0.020478217...]2025-01-02 00:00:00+00:002026-01-01 00:00:00+00:00
3"Sample Title 3"[-0.032477397, -0.03246777, 0.03540772, 0.0077085723...]2025-01-03 00:00:00+00:002026-01-02 00:00:00+00:00
4"Sample Title 4"[-0.023161052, 0.021010065, 0.08599099, -0.03261379...]2025-01-06 00:00:00+00:002026-01-05 00:00:00+00:00
5"Sample Title 5"[-0.032477397, -0.03246777, 0.03540772, 0.0077085723...]2025-01-02 00:00:00+00:002026-01-01 00:00:00+00:00
6"Sample Title 6"[0.021010065, 0.08599099, -0.03261379, 0.013094089...]2025-01-04 00:00:00+00:002026-01-03 00:00:00+00:00

The sink function receives additional information that may be necessary for configuring the storage system. The context object contains:

  • secrets - The secrets defined in the metadata
  • feature_view_id - The id of the feature view, this is updated whenever the object is created or recreated.
  • feature_view_name
  • workspace
  • start_time and end_time - The time frame for which we are retrieving complete features

Users can use these fields to define custom behavior for where to store data, how to overwrite data, etc.

The following is an example for storing vector embeddings in a postgres instance:

@sink_config(
mode="pandas",
secrets={
"user": Secret(scope="postgres_secrets", key="username"),
"password": Secret(scope="postgres_secrets", key="password"),
"host": Secret(scope="postgres_secrets", key="host"),
},
)
def postgres_sink(df, context):
import psycopg

# Create new table if it doesn't exist
# If storage data should be reset whenever old Feature view is invalidated, use `context.feature_view_id`,
# if not, `context.feature_view_name` can be used.

# Database connection details
host = context.secrets["host"]
dbname = context.workspace
user = context.secrets["user"]
password = context.secrets["password"]

# Connect to the database
conn = psycopg.connect(host=host, dbname=dbname, user=user, password=password)
cursor = conn.cursor()

# SQL query to insert data
# Upsert data depending on the end_time of the publish job to ensure older jobs don't
# overwrite newer data.
insert_query = f"""
INSERT INTO {context.feature_view_id} (id, title, embedding, last_updated)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id)
DO UPDATE SET
id = EXCLUDED.id,
embedding = CASE
WHEN EXCLUDED.last_updated > {context.feature_view_id}.last_updated THEN EXCLUDED.embedding,
ELSE {context.feature_view_id}.embedding
END,
last_updated = CASE
WHEN EXCLUDED.last_updated > {context.feature_view_id}.last_updated THEN EXCLUDED.last_updated,
ELSE {context.feature_view_id}.last_updated
END,
title = CASE
WHEN EXCLUDED.last_updated > {context.feature_view_id}.last_updated THEN EXCLUDED.title,
ELSE {context.feature_view_id}.title
END;
"""

# Data to insert
data_to_insert = [
(row.id, row.TITLE, "[" + ",".join(map(str, row.embedding)) + "]", context.end_time)
for row in df.itertuples(index=False)
]
cursor.executemany(insert_query, data_to_insert)
rows_inserted = cursor.rowcount
print(f"Number of rows inserted: {rows_inserted}")

conn.commit()

# Close the cursor and connection
if cursor:
cursor.close()
if conn:
conn.close()


@batch_feature_view(
description="Embeddings",
sources=[product_descriptions],
entities=[id],
mode="pandas",
features=[
Attribute(name="TITLE", dtype=String),
Embedding(
input_column=Field("DESCRIPTION", String),
model="sentence-transformers/all-MiniLM-L6-v2",
name="description_embedding",
),
],
online=True,
offline=True,
feature_start_time=datetime(2025, 1, 1),
batch_schedule=timedelta(days=1),
tags={"team": "finance", "status": "production"},
timestamp_field="timestamp",
environment="sample_environment",
batch_compute=RiftBatchConfig(
# NOTE: we recommend using L4 GPU instances for Embeddings inference
instance_type="g6.xlarge",
),
publish_features_config=PublishFeaturesConfig(
publish_start_time=datetime(2025, 2, 1), publish_sink=postgres_sink, publish_offline=False
),
)
def description_embeddings(input):
return input[["id", "timestamp", "TITLE", "DESCRIPTION"]]

The connection parameters can be stored in Tecton Secrets.

Authentication to Custom Store​

We recommend using Tecton Secrets for Authentication to your custom store to allow for workspace-scoped access controls. You can also authenticate with workspace-scoped instance profiles via Configure Data Source Access per Workspace. If you are using Databricks compute, this can be done self-serve via Service Principals:

Databricks Service Principals​

If you are using Databricks compute and would like to authenticate to an AWS Resource, you can utilize instance profiles. The following describes how to connect your Tecton Compute AWS Account (Account A) to the AWS Account containing your custom store (Account B).

1.) Create a IAM role that allows cross-account access to your specified resource in Account B:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DynamoDB",
"Effect": "Allow",
"Action": ["dynamodb:*"],
"Resource": [
"arn:aws:dynamodb:${REGION}:${ACCOUNT_B_ID}:table/tecton-${DEPLOYMENT_NAME}*"
]
},
{
"Sid": "AllowAssumeRole",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::{ACCOUNT_B_ID}:role/{NAME_OF_ROLE_IN_ACCOUNT_B}"
}
]
}

Above is permissions on a dynamo table. The trusted identity should be the AWS Account ID of the account your Tecton compute runs in (Account A).

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{ACCOUNT_A_ID}:root"
},
"Action": "sts:AssumeRole",
"Condition": {}
}
]
}

This can also be scoped down the role you create in step 2 below.

Note: For s3, this can be done directly via the bucket policy.

2.) Setup Databricks jobs in Account A to provide access Create an Instance Profile in AWS Account A. If Role in Account B scoped down to the role level trust, this should match. This instance profile requires all the basic permissions needed for materialization jobs, see how to Configure Databricks

3.) Create Databricks Service Principal Create a Databricks Service Principal that will be assigned to your provided workspace. Give this Service Principal to the Instance profile created in Step 2. Follow instructions to create Databricks Service Principals within Tecton. This allows users to assign a service principal default per workspace or Feature View.

Was this page helpful?