Stream Feature View with Rift
This feature is currently in Public Preview.
- Cannot leverage AWS PrivateLink for network controls
The Stream Ingest API is an endpoint to ingest or update features in the Tecton Feature Store with sub-second latency. Records sent to the Stream Ingest API are immediately written to the Tecton Feature Store and made available for training and inference.
The Stream Ingest API seamlessly integrates with Rift, Tecton's managed compute engine, to optionally execute feature transformations.
By using the Stream Ingest API to send data to the Tecton Feature Platform, ML teams can:
- Easily build powerful streaming features on event data using Rift: Rift as well as Tecton's Aggregation Engine enable Data Scientists and ML Engineers to author and manage transformations in familiar Python, allowing you to skip the complicated code and heavy stream processing infrastructure required by other solutions.
- Integrate Tecton with any existing streaming feature pipeline without migrating feature code. The Stream Ingest API lets teams get all the data management, serving, governance, monitoring, etc. benefits of Tectonβs Feature Platform on top of their existing feature pipelines, without having to rewrite any feature code. This means no need to migrate feature pipelines that are already working to get started using an enterprise feature platform, making it faster and easier for ML and DS teams to get their features centrally managed for trusted and reliable training and serving.
- Bring read-after-write consistency to your feature infrastructure: The Stream Ingest API can block until input data has been fully processed and corresponding features are updated, making it easy for your application to push event data to the feature platform and quickly retrieve up-to-date feature vectors β something very useful for event-driven decisioning applications like loan approvals and fraud monitoring.
For information about connecting data sources to Rift, see
Connect Data Sources to Rift.
Using the Stream Ingest APIβ
Creating a simple Stream Feature View with a Stream Sourceβ
To write features with the Stream Ingest API, create a Stream Source and then define a Stream Feature View using that Stream Source as the data source. The example below is based on the ads data in the Tecton sample repo. A more detailed example of creating a Stream Feature View using the fraud data from that same repo is in the Building Streaming Features.
The following example declares a Stream Source object.
from tecton import PushConfig, FileConfig, StreamSource
from tecton.types import String, Int64, Timestamp, Field
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
batch_config = FileConfig(uri="s3://path/to/my/parquet/file", timestamp_field="timestamp", file_format="parquet")
impressions_event_source = StreamSource(
name="impressions_event_source",
schema=input_schema,
stream_config=PushConfig(),
batch_config=batch_config,
description="A stream source for synchronous, online ingestion of ad-click events with user info.",
)
schema
: This is the schema of records that will be sent to the Stream Ingest API. Stream Sources with a Push Config need schemas defined explicitly, and records push to the Stream Ingest API must conform to the schema. See Data Types for a list of supported types.batch_config
is used to backfill data to the online and offline store, as well as scheduled offline materialization. It is important to note that the schema of the Batch Source must contain at least all the columns defined in the schema of the Stream Source.
You can then create a Stream Feature View with the Stream Source you created. In this example, we simply want to serve the records sent to the Stream Ingest API as-is, so no transformation is defined.
from datetime import datetime, timedelta
from tecton import Entity, StreamFeatureView
from tecton.types import Field, String
user = Entity(
name="ads_user",
join_keys=[Field("user_id", String)],
description="A user of the platform",
owner="demo-user@tecton.ai",
tags={"release": "production"},
)
click_events_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
ttl=timedelta(days=7),
batch_schedule=timedelta(days=1),
description="The count of ad clicks for a user",
)
- When the parameter
online
is set toTrue
, Tecton will automatically kick off jobs to backfill feature data fromfeature_start_time
to the online store. After that, any records sent to the Stream Ingest API will be immediately written to the Online Store. - When the parameter
offline
is set toTrue
, Tecton will kick off jobs to backfill feature data fromfeature_start_time
to the offline store. Frontfill jobs to the offline store will then run on the definedbatch_schedule
.
Once applied successfully, the Stream Source and the Stream Feature View are
ready to receive records from the Stream Ingest API. There may be a few minutes
of delay after the tecton apply
before the API can accept records for the
Stream Source.
Stream Feature View with a Stream Source on Google Cloudβ
The main difference when using a Stream Feature View with a Stream Source on Google Cloud is that you must use BigTable or Redis as the online store. The following example shows how to create a Stream Source and a Stream Feature View on Google Cloud. For details on how to set up online stores, see the individual guides for Bigtable and Redis.
from tecton import (
PushConfig,
StreamSource,
pandas_batch_config,
StreamFeatureView,
Entity,
BigtableConfig,
OfflineStoreConfig,
BatchTriggerType,
DeltaConfig,
)
from tecton.types import String, Int64, Timestamp, Field, Float64
from datetime import datetime, timedelta
FEATURE_START_TIME = datetime(2023, 5, 10)
entity = Entity(name="user_id", join_keys=[Field("user_id", String)])
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="value", dtype=Int64),
Field(name="score", dtype=Float64),
]
@pandas_batch_config(
supports_time_filtering=True,
)
def batch_config(filter_context):
import pandas
cols = ["user_id", "timestamp", "value", "score"]
data = [
["1", "2023-05-10T23:59:00", 1, -47.437337],
["1", "2023-05-12T00:00:00", 2, 36.837372],
["2", "2023-05-13T12:00:00", 3, -20.573452],
["2", "2023-05-14T23:59:59", 4, 15.298372],
["1", "2023-05-15T00:00:00", 5, -9.283623],
]
return pandas.DataFrame(data=data, columns=cols).astype({"timestamp": "datetime64[us]"})
impressions_event_source = StreamSource(
name="event_source",
schema=input_schema,
stream_config=PushConfig(),
batch_config=batch_config,
description="A stream source for synchronous, online ingestion of ad-click events with user info.",
)
StreamFeatureView(
name="feature_view",
mode="pandas",
entities=[entity],
source=impressions_event_source,
online=True,
offline=True,
offline_store=DeltaConfig(),
feature_start_time=FEATURE_START_TIME,
batch_schedule=timedelta(days=1),
batch_trigger=BatchTriggerType.MANUAL,
online_store=BigtableConfig(),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("value", Int64),
Field("score", Float64),
],
)
For more information on BigtableConfig, see the SDK docs.
Authenticating requests with an API keyβ
To authenticate your requests to the Stream Ingest API, you will need to create
a Service Account, and grant that Service Account the Editor role for your
workspace. The Editor role is needed to actually write data to the Feature
Store, but a Viewer role is sufficient for dry_run=True
requests.
- Create the Service Account to obtain your API key.
tecton service-account create \
--name "stream-ingest-service-account" \
--description "A Stream Ingest API sample"
Output:
Save this API Key - you will not be able to get it again.
API Key: <Your-api-key>
Service Account ID: <Your-Service-Account-Id>
- Assign the Editor role to that Service Account.
tecton access-control assign-role --role editor \
--workspace <Your-workspace> \
--service-account <Your-Service-Account-Id>
Output:
Successfully updated role.
- Export the API key as an environment variable named
TECTON_API_KEY
or add the key to your secret manager.
export TECTON_API_KEY="my-key-code"
Sending Data to the Stream Ingest APIβ
Note: The following example uses cURL as the HTTP client and can be executed from the command line, but the HTTP call is the same for any client.
The Stream Ingest API accepts data in JSON format, via an HTTP API. To ingest
events into Tecton, you can use the
https://preview.<your_cluster>.tecton.ai/ingest
endpoint.
For a Google Cloud deployment, the endpoint is
https://<region>.<your_cluster>.tecton.ai/ingest
. The region is the region
where your Tecton deployment is hosted, e.g. us-west1
. For more details on
setting up Tecton on Google Cloud, see the
Google Cloud Deployment Guide.
Sample Request
The below example shows a cURL query that sends a batch with 2 records to the Stream Ingest API. In this example:
workspace_name
: name of the workspace where the Stream Sources(s) are defineddry_run
: When set to True, the request will be validated, and the transformations executed, but nothing will be written to the Online Store. This parameter can be used for debugging and/or verifying the correctness of the requestrecords
: Contains a map of Stream Source names to an array of records to be ingested. Note that a single Stream Ingest API request can support batch of records for multiple sources.
$ curl -X POST https://preview.<your_cluster>.tecton.ai/ingest\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"workspace_name": "ingestapi-test",
"dry_run": false,
"records": {
"impressions_event_source": [
{
"record": {
"timestamp": "2023-01-02T00:25:06Z",
"user_id": "C18947Z5",
"clicked": 1
}
},
{
"record": {
"timestamp": "2022-12-29T00:25:06Z",
"user_id": "C98FG569",
"clicked": 1
}
}
]
}
}'
Sample Response
A successful response from the Stream Ingest API includes metrics on the number of records ingested per Feature View. If there are multiple Stream Feature Views using the same Stream Source, any record sent to the Stream Ingest API for the Stream Source will be written to each dependent Feature View.
{
"workspaceName": "ingestapi-live",
"ingestMetrics": {
"featureViewIngestMetrics": [
{
"featureViewName": "click_events_fv",
"onlineRecordIngestCount": "2",
"offlineRecordIngestCount": "2",
"featureViewId": "b6f14021557d3f2ba95247add3642820"
}
]
}
}
Reading Ingested Records from the Online Storeβ
In order to read ingested data for Feature Views, you must define a Feature Service that includes the Feature View with online serving enabled, as below
from tecton import FeatureService
from ads.features.stream_features.click_events_fv import click_events_fv
ad_ctr_feature_service = FeatureService(
name="ad_ctr_feature_service",
description="A Feature Service providing features for a model that predicts if a user will click an ad.",
online_serving_enabled=True,
features=[
click_events_fv,
],
)
Once applied successfully, you can then retrieve features from the online store using the FeatureService HTTP API
$ curl -X POST https://<your_cluster>.tecton.ai/api/v1/feature-service/get-features\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"params": {
"workspace_name": "ingestapi-live",
"feature_service_name": "ad_ctr_feature_service",
"join_key_map": {
"user_id": "C18947Z5"
}
}
}'
Reading Records from the Offline Storeβ
Records ingested to the Offline Store can be retrieved via the Feature Service's
get_features_for_events()
method in the Python SDK. Refer
here
for more information.
Stream Ingest API Error Handlingβ
Error Responsesβ
An error response from Stream Ingest API could contain a RequestError
or a
RecordError
depending on the type of error encountered. Request Errors include
errors such as a missing required field in the request, incorrectly formatted
request, invalid workspace name etc
{
"requestError": {
"errorMessage": "Required field `workspace_name` is missing in the request",
"errorType": "RequestValidationError"
}
}
Record Errors include any error encountered when processing the batch of records in the request. Some examples are missing event timestamp column, invalid timestamp format, event timestamp outside of the serving TTL for the Feature View etc
{
"workspaceName": "ingestapi-live",
"recordErrors": [
{
"featureViewName": "keyword_ingest_fv",
"pushSourceName": "click_event_source",
"errorType": "TectonObjectValidationError",
"errorMessage": "timestamp value is outside of the serving TTL defined for Feature View keyword_push_fv"
}
]
}
HTTP Response Status Codesβ
Here is a list of the possible HTTP response codes:
Status Code | Reason |
---|---|
200 OK | Success. Features have been ingested into the online store. |
400 Bad Request | Incorrectly formatted request body (JSON syntax errors) Missing required fields ( dry_run , records , workspace_name ) in the request Schema mismatch between pushed record and the schema declared in the Stream Source definition. |
401 Unauthorized | Missing or invalid Tecton-key authorization header. |
403 Forbidden | The Service Account associated with the API Key is not authorized to perform a write operation for the workspace. |
404 Not Found | workspace , stream source or feature views subscribed to the stream source not found |
500 Internal Server Error | e.g. Internal Server Error. Please contact Tecton Support for assistance |
Stream Ingest API Expectationsβ
- If
dry_run
is set to False in the Stream Ingest API request, then the workspace must be a live workspace with at least one Stream Feature View subscribed to the corresponding Stream Source in the request and has online write enabled (online=True
) - The schema of each record in the request must match the schema of the
corresponding Stream Source, as defined during
tecton apply
- The event timestamp in the record must be within the serving TTL defined for the Stream Feature View
- Fields with type
Timestamp
in the Stream Source schema must have the timestamp provided formatted in a ISO 8601-compliant format. If using Java, the DateTimeFormatter.html#ISO_OFFSET_DATE_TIME formatter produces a correctly formatted string from a DateTime instance. If using Python, thedatetime.isoformat()
produces a correctly formatted string from a datetime instance For example, - For other languages, the datetime string must adhere to the following ISO 8601
format
YYYY-MM-DDThh:mm[:ss[.fffffffff]][TZ]
. The specific meanings are detailed below, and unless otherwise noted the numeric fields are to be pre-padded with zeros if necessary to have the specified length.YYYY
is the 4-digit yearMM
is the 2-digit month-of-yearDD
is the 2-digit day-of-monthT
is the capital letter Thh
is the 2-digit 24-hour clock hour-of-day- Optionally a colon followed by two digits for the second-of-minute
- If seconds are present, optionally followed by a decimal point and up to nine digits for nanoseconds. These do not have to be pre-padded with zeros.
- TZ is an optional offset identifier which takes one of the following forms.
If it is not present, then UTC time is assumed.
Z
for UTC+hh:mm
or-hh:mm
for ahh
hours andmm
minutes offset
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
print(now.isoformat())
Stream Ingest API Service Level Objectivesβ
The Service Level Objective (SLO) for the Stream Ingest API at
https://preview.<your_cluster>.tecton.ai/ingest
is 99.95%, measured over the
trailing 30 days.
This SLO measurement does not include requests with a 4xx response code. Note that during sudden increases in request volume, there may be an increase in 429 response codes until the service has scaled to the appropriate capacity.