Skip to main content
Version: 1.1

Read Multiple Feature Vectors using Wildcards

Tecton allows you to fetch a set of features based on specifying a subset of entity IDs. This functionality is commonly used when multiple candidates need to be scored, such as in a recommendation system.

In this example, we'll show how to retrieve feature vectors for all ads a user has seen in the past week. We'll walk through:

  • Configuring the feature view with an online serving index
  • Retrieving features online
  • Creating training data

Configuring your feature views​

First, when defining the feature view, you need to specify the online_serving_index parameter and omit the key you won't use during retrieval. In this case, we will specify the user at feature retrieval time, and get back a row for each ad they have feature values for.

from tecton import stream_feature_view, Aggregate
from core.entities import user
from ads.entities import ad
from ads.data_sources.ad_impressions_stream import ad_impressions_stream
from datetime import datetime, timedelta
from tecton.types import Field, Int32


@stream_feature_view(
source=ad_impressions_stream,
entities=[user, ad],
online_serving_index=["user_uuid"], # Only the user_uuid will be used at retrieval time
mode="spark_sql",
aggregation_interval=timedelta(hours=1),
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=12)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=24)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=72)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=168)),
],
online=False,
offline=False,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2021, 1, 1),
description="The count of impressions between a given user and a given ad",
)
def user_ad_impression_counts(ad_impressions_stream):
return f"""
select
user_uuid as user_id,
ad_id,
1 as impression,
timestamp
from
{ad_impressions_stream}
"""

Now that we've specified our serving indices for the Feature View, let's create our Feature Service to enable online retrieval.

from tecton import FeatureService, FeaturesConfig
from feature_repo.shared.features.user_ad_impression_counts_wildcard import (
user_ad_impression_counts_wildcard,
)

ctr_prediction_service = FeatureService(
name="ctr_prediction_service",
description="A Feature Service used for supporting a CTR prediction model.",
online_serving_enabled=True,
features=[user_ad_impression_counts],
)

Fetching wildcard features online​

Once those changes have been applied, we can use the Tecton python library to retrieve a dataframe representing all the features that match our user by omitting the ad_id join key.

import tecton

ws = tecton.get_workspace("prod")
my_fs = ws.get_feature_service("ctr_prediction_service")

keys = {"user_uuid": "sample-user-uuid"}

response = my_fs.query_features(keys).to_pandas()
print(response.head())

Alternatively, we can use the HTTP API. See the section above for more detail on how to configure the API key.

$ export TECTON_API_KEY='<your_tecton_key>'

$ curl -X POST https://<your_cluster>.tecton.ai/api/v1/feature-service/get-features\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"params": {
"workspace_name": "prod",
"feature_service_name": "ctr_prediction_service",
"join_key_map": {
"user_uuid": "sample-user-id",
}
}
}'

Creating training sets with wildcard features​

Similarly, we can construct our training dataset by providing a prediction context that contains the join key we specified as our serving index.

import tecton

events = spark.read.parquet("dbfs:/event_data.pq").select("user_uuid", "timestamp")

ws = tecton.get_workspace("prod")
my_fs = ws.get_feature_service("ctr_prediction_service")

training_set = fs.get_features_for_events(events, timestamp_key="timestamp")

print(training_set.to_pandas().head())

Was this page helpful?