Read Multiple Feature Vectors using Wildcards
Tecton allows you to fetch a set of features based on specifying a subset of entity IDs. This functionality is commonly used when multiple candidates need to be scored, such as in a recommendation system.
In this example, we'll show how to retrieve feature vectors for all ads a user has seen in the past week. We'll walk through:
- Configuring the feature view with an online serving index
- Retrieving features online
- Creating training data
Configuring your feature views​
First, when defining the feature view, you need to specify the
online_serving_index
parameter and omit the key you won't use during
retrieval. In this case, we will specify the user at feature retrieval time, and
get back a row for each ad they have feature values for.
from tecton import stream_feature_view, Aggregate
from core.entities import user
from ads.entities import ad
from ads.data_sources.ad_impressions_stream import ad_impressions_stream
from datetime import datetime, timedelta
from tecton.types import Field, Int32
@stream_feature_view(
source=ad_impressions_stream,
entities=[user, ad],
online_serving_index=["user_uuid"], # Only the user_uuid will be used at retrieval time
mode="spark_sql",
aggregation_interval=timedelta(hours=1),
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=12)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=24)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=72)),
Aggregate(input_column=Field("impression", Int32), function="count", time_window=timedelta(hours=168)),
],
online=False,
offline=False,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2021, 1, 1),
description="The count of impressions between a given user and a given ad",
)
def user_ad_impression_counts(ad_impressions_stream):
return f"""
select
user_uuid as user_id,
ad_id,
1 as impression,
timestamp
from
{ad_impressions_stream}
"""
Now that we've specified our serving indices for the Feature View, let's create our Feature Service to enable online retrieval.
from tecton import FeatureService, FeaturesConfig
from feature_repo.shared.features.user_ad_impression_counts_wildcard import (
user_ad_impression_counts_wildcard,
)
ctr_prediction_service = FeatureService(
name="ctr_prediction_service",
description="A Feature Service used for supporting a CTR prediction model.",
online_serving_enabled=True,
features=[user_ad_impression_counts],
)
Fetching wildcard features online​
Once those changes have been applied, we can use the Tecton python library to
retrieve a dataframe representing all the features that match our user by
omitting the ad_id
join key.
import tecton
ws = tecton.get_workspace("prod")
my_fs = ws.get_feature_service("ctr_prediction_service")
keys = {"user_uuid": "sample-user-uuid"}
response = my_fs.query_features(keys).to_pandas()
print(response.head())
Alternatively, we can use the HTTP API. See the section above for more detail on how to configure the API key.
$ export TECTON_API_KEY='<your_tecton_key>'
$ curl -X POST https://<your_cluster>.tecton.ai/api/v1/feature-service/get-features\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"params": {
"workspace_name": "prod",
"feature_service_name": "ctr_prediction_service",
"join_key_map": {
"user_uuid": "sample-user-id",
}
}
}'
Creating training sets with wildcard features​
Similarly, we can construct our training dataset by providing a prediction context that contains the join key we specified as our serving index.
import tecton
events = spark.read.parquet("dbfs:/event_data.pq").select("user_uuid", "timestamp")
ws = tecton.get_workspace("prod")
my_fs = ws.get_feature_service("ctr_prediction_service")
training_set = fs.get_features_for_events(events, timestamp_key="timestamp")
print(training_set.to_pandas().head())