Skip to main content
Version: 1.0

Stream Feature View with Rift

Public Preview

This feature is currently in Public Preview.

This feature has the following limitations:
  • Cannot leverage AWS PrivateLink for network controls
If you have questions or want to share feedback, please file a feature request.

The Stream Ingest API is an endpoint to ingest or update features in the Tecton Feature Store with sub-second latency. Records sent to the Stream Ingest API are immediately written to the Tecton Feature Store and made available for training and inference.

The Stream Ingest API seamlessly integrates with Rift, Tecton's managed compute engine, to optionally execute feature transformations.

By using the Stream Ingest API to send data to the Tecton Feature Platform, ML teams can:

  • Easily build powerful streaming features on event data using Rift: Rift as well as Tecton's Aggregation Engine enable Data Scientists and ML Engineers to author and manage transformations in familiar Python, allowing you to skip the complicated code and heavy stream processing infrastructure required by other solutions.
  • Integrate Tecton with any existing streaming feature pipeline without migrating feature code. The Stream Ingest API lets teams get all the data management, serving, governance, monitoring, etc. benefits of Tecton’s Feature Platform on top of their existing feature pipelines, without having to rewrite any feature code. This means no need to migrate feature pipelines that are already working to get started using an enterprise feature platform, making it faster and easier for ML and DS teams to get their features centrally managed for trusted and reliable training and serving.
  • Bring read-after-write consistency to your feature infrastructure: The Stream Ingest API can block until input data has been fully processed and corresponding features are updated, making it easy for your application to push event data to the feature platform and quickly retrieve up-to-date feature vectors β€” something very useful for event-driven decisioning applications like loan approvals and fraud monitoring.

Using the Stream Ingest API​

Creating a simple Stream Feature View with a Stream Source​

To write features with the Stream Ingest API, create a Stream Source and then define a Stream Feature View using that Stream Source as the data source. The example below is based on the ads data in the Tecton sample repo. A more detailed example of creating a Stream Feature View using the fraud data from that same repo is in the Building Streaming Features.

The following example declares a Stream Source object.

from tecton import PushConfig, FileConfig, StreamSource
from tecton.types import String, Int64, Timestamp, Field

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]

batch_config = FileConfig(uri="s3://path/to/my/parquet/file", timestamp_field="timestamp", file_format="parquet")

impressions_event_source = StreamSource(
name="impressions_event_source",
schema=input_schema,
stream_config=PushConfig(),
batch_config=batch_config,
description="A stream source for synchronous, online ingestion of ad-click events with user info.",
)
  • schema: This is the schema of records that will be sent to the Stream Ingest API. Stream Sources with a Push Config need schemas defined explicitly, and records push to the Stream Ingest API must conform to the schema. See Data Types for a list of supported types.
  • batch_config is used to backfill data to the online and offline store, as well as scheduled offline materialization. It is important to note that the schema of the Batch Source must contain at least all the columns defined in the schema of the Stream Source.

You can then create a Stream Feature View with the Stream Source you created. In this example, we simply want to serve the records sent to the Stream Ingest API as-is, so no transformation is defined.

from datetime import datetime, timedelta
from tecton import Entity, StreamFeatureView
from tecton.types import Field, String

user = Entity(
name="ads_user",
join_keys=[Field("user_id", String)],
description="A user of the platform",
owner="demo-user@tecton.ai",
tags={"release": "production"},
)

click_events_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
ttl=timedelta(days=7),
batch_schedule=timedelta(days=1),
description="The count of ad clicks for a user",
)
  • When the parameter online is set to True, Tecton will automatically kick off jobs to backfill feature data from feature_start_time to the online store. After that, any records sent to the Stream Ingest API will be immediately written to the Online Store.
  • When the parameter offline is set to True, Tecton will kick off jobs to backfill feature data from feature_start_time to the offline store. Frontfill jobs to the offline store will then run on the defined batch_schedule.

Once applied successfully, the Stream Source and the Stream Feature View are ready to receive records from the Stream Ingest API. There may be a few minutes of delay after the tecton apply before the API can accept records for the Stream Source.

Stream Feature View with a Stream Source on Google Cloud​

The main difference when using a Stream Feature View with a Stream Source on Google Cloud is that you must use BigTable or Redis as the online store. The following example shows how to create a Stream Source and a Stream Feature View on Google Cloud. For details on how to set up online stores, see the individual guides for Bigtable and Redis.

from tecton import (
PushConfig,
StreamSource,
pandas_batch_config,
StreamFeatureView,
Entity,
BigtableConfig,
OfflineStoreConfig,
BatchTriggerType,
DeltaConfig,
)
from tecton.types import String, Int64, Timestamp, Field, Float64

from datetime import datetime, timedelta


FEATURE_START_TIME = datetime(2023, 5, 10)

entity = Entity(name="user_id", join_keys=[Field("user_id", String)])

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="value", dtype=Int64),
Field(name="score", dtype=Float64),
]


@pandas_batch_config(
supports_time_filtering=True,
)
def batch_config(filter_context):
import pandas

cols = ["user_id", "timestamp", "value", "score"]
data = [
["1", "2023-05-10T23:59:00", 1, -47.437337],
["1", "2023-05-12T00:00:00", 2, 36.837372],
["2", "2023-05-13T12:00:00", 3, -20.573452],
["2", "2023-05-14T23:59:59", 4, 15.298372],
["1", "2023-05-15T00:00:00", 5, -9.283623],
]
return pandas.DataFrame(data=data, columns=cols).astype({"timestamp": "datetime64[us]"})


impressions_event_source = StreamSource(
name="event_source",
schema=input_schema,
stream_config=PushConfig(),
batch_config=batch_config,
description="A stream source for synchronous, online ingestion of ad-click events with user info.",
)

StreamFeatureView(
name="feature_view",
mode="pandas",
entities=[entity],
source=impressions_event_source,
online=True,
offline=True,
offline_store=DeltaConfig(),
feature_start_time=FEATURE_START_TIME,
batch_schedule=timedelta(days=1),
batch_trigger=BatchTriggerType.MANUAL,
online_store=BigtableConfig(),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("value", Int64),
Field("score", Float64),
],
)

For more information on BigtableConfig, see the SDK docs.

Authenticating requests with an API key​

To authenticate your requests to the Stream Ingest API, you will need to create a Service Account, and grant that Service Account the Editor role for your workspace. The Editor role is needed to actually write data to the Feature Store, but a Viewer role is sufficient for dry_run=True requests.

  1. Create the Service Account to obtain your API key.
tecton service-account create \
--name "stream-ingest-service-acount" \
--description "A Stream Ingest API sample"

Output:

Save this API Key - you will not be able to get it again.
API Key: <Your-api-key>
Service Account ID: <Your-Service-Account-Id>
  1. Assign the Editor role to that Service Account.
tecton access-control assign-role --role editor \
--workspace <Your-workspace> \
--service-account <Your-Service-Account-Id>

Output:

Successfully updated role.
  1. Export the API key as an environment variable named TECTON_API_KEY or add the key to your secret manager.
export TECTON_API_KEY="my-key-code"

Sending Data to the Stream Ingest API​

Note: The following example uses cURL as the HTTP client and can be executed from the command line, but the HTTP call is the same for any client.

The Stream Ingest API accepts data in JSON format, via an HTTP API. To ingest events into Tecton, you can use the https://preview.<your_cluster>.tecton.ai/ingest endpoint.

For a Google Cloud deployment, the endpoint is https://<region>.<your_cluster>.tecton.ai/ingest. The region is the region where your Tecton deployment is hosted, e.g. us-west1. For more details on setting up Tecton on Google Cloud, see the Google Cloud Deployment Guide.

Sample Request

The below example shows a cURL query that sends a batch with 2 records to the Stream Ingest API. In this example:

  • workspace_name: name of the workspace where the Stream Sources(s) are defined
  • dry_run: When set to True, the request will be validated, and the transformations executed, but nothing will be written to the Online Store. This parameter can be used for debugging and/or verifying the correctness of the request
  • records: Contains a map of Stream Source names to an array of records to be ingested. Note that a single Stream Ingest API request can support batch of records for multiple sources.
$ curl -X POST https://preview.<your_cluster>.tecton.ai/ingest\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"workspace_name": "ingestapi-test",
"dry_run": false,
"records": {
"impressions_event_source": [
{
"record": {
"timestamp": "2023-01-02T00:25:06Z",
"user_id": "C18947Z5",
"clicked": 1
}
},
{
"record": {
"timestamp": "2022-12-29T00:25:06Z",
"user_id": "C98FG569",
"clicked": 1
}
}
]
}
}'

Sample Response

A successful response from the Stream Ingest API includes metrics on the number of records ingested per Feature View. If there are multiple Stream Feature Views using the same Stream Source, any record sent to the Stream Ingest API for the Stream Source will be written to each dependent Feature View.

{
"workspaceName": "ingestapi-live",
"ingestMetrics": {
"featureViewIngestMetrics": [
{
"featureViewName": "click_events_fv",
"onlineRecordIngestCount": "2",
"offlineRecordIngestCount": "2",
"featureViewId": "b6f14021557d3f2ba95247add3642820"
}
]
}
}

Reading Ingested Records from the Online Store​

In order to read ingested data for Feature Views, you must define a Feature Service that includes the Feature View with online serving enabled, as below

from tecton import FeatureService
from ads.features.stream_features.click_events_fv import click_events_fv

ad_ctr_feature_service = FeatureService(
name="ad_ctr_feature_service",
description="A Feature Service providing features for a model that predicts if a user will click an ad.",
online_serving_enabled=True,
features=[
click_events_fv,
],
)

Once applied successfully, you can then retrieve features from the online store using the FeatureService HTTP API

$ curl -X POST https://<your_cluster>.tecton.ai/api/v1/feature-service/get-features\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"params": {
"workspace_name": "ingestapi-live",
"feature_service_name": "ad_ctr_feature_service",
"join_key_map": {
"user_id": "C18947Z5"
}
}
}'

Reading Records from the Offline Store​

Records ingested to the Offline Store can be retrieved via the Feature Service's get_features_for_events() method in the Python SDK. Refer here for more information.

Stream Ingest API Error Handling​

Error Responses​

An error response from Stream Ingest API could contain a RequestError or a RecordError depending on the type of error encountered. Request Errors include errors such as a missing required field in the request, incorrectly formatted request, invalid workspace name etc

{
"requestError": {
"errorMessage": "Required field `workspace_name` is missing in the request",
"errorType": "RequestValidationError"
}
}

Record Errors include any error encountered when processing the batch of records in the request. Some examples are missing event timestamp column, invalid timestamp format, event timestamp outside of the serving TTL for the Feature View etc

{
"workspaceName": "ingestapi-live",
"recordErrors": [
{
"featureViewName": "keyword_ingest_fv",
"pushSourceName": "click_event_source",
"errorType": "TectonObjectValidationError",
"errorMessage": "timestamp value is outside of the serving TTL defined for Feature View keyword_push_fv"
}
]
}

HTTP Response Status Codes​

Here is a list of the possible HTTP response codes:

Status CodeReason
200 OKSuccess. Features have been ingested into the online store.
400 Bad RequestIncorrectly formatted request body (JSON syntax errors)
Missing required fields (dry_run, records, workspace_name) in the request
Schema mismatch between pushed record and the schema declared in the Stream Source definition.
401 UnauthorizedMissing or invalid Tecton-key authorization header.
403 ForbiddenThe Service Account associated with the API Key is not authorized to perform a write operation for the workspace.
404 Not Foundworkspace, stream source or feature views subscribed to the stream source not found
500 Internal Server Errore.g. Internal Server Error. Please contact Tecton Support for assistance

Stream Ingest API Expectations​

  • If dry_run is set to False in the Stream Ingest API request, then the workspace must be a live workspace with at least one Stream Feature View subscribed to the corresponding Stream Source in the request and has online write enabled (online=True)
  • The schema of each record in the request must match the schema of the corresponding Stream Source, as defined during tecton apply
  • The event timestamp in the record must be within the serving TTL defined for the Stream Feature View
  • Fields with type Timestamp in the Stream Source schema must have the timestamp provided formatted as a RFC 3339 formatted string in the JSON payload. If using Java, the DateTimeFormatter.html#ISO_OFFSET_DATE_TIME formatter produces a correctly formatted string from a DateTime instance.

If using Python, the datetime.isoformat() produces a correctly formatted string from a datetime instance For example,

from datetime import datetime, timezone

now = datetime.now(timezone.utc)
print(now.isoformat())

Stream Ingest API Service Level Objectives​

The Service Level Objective (SLO) for the Stream Ingest API at https://preview.<your_cluster>.tecton.ai/ingest is 99.95%, measured over the trailing 30 days.

This SLO measurement does not include requests with a 4xx response code. Note that during sudden increases in request volume, there may be an increase in 429 response codes until the service has scaled to the appropriate capacity.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon