Skip to main content

Using the Stream Ingest API

Private Preview

This feature is currently in Private Preview.

This feature has the following limitations:
  • There are no SLOs for the Stream Ingest API.
  • The Stream Ingest API for Tecton on Snowflake is under development. Please reach out to Tecton Support if this is something you are interested in.
Please file a feature request if you would like to participate in the preview.

The Stream Ingest API allows for synchronous and scalable ingestion of events into Tecton's online store with milliseconds latency. The Stream Ingest API is an HTTP API, accepting JSON records, and as such has no dependency on Spark Streaming.

info

"API", where used below, refers to the Stream Ingest API.

The API is serverless; users do not need to be concerned with provisioning, infrastructure management, or scaling.

The API can be called directly from a microservice or from a stream processor, to ingest data into Tecton.

Data ingested though the API is written to the online store, and optionally the offline store. The online store only has the most recently pushed value, so any previous values for an entity may be overwritten.

Creating a Stream Feature View With a Push Source​

info

Using the Stream Ingest API requires >= Tecton 0.6.

To use the API, you will need a StreamFeatureView with a PushSource.

In addition to the metadata needed to define Tecton objects, a PushSource needs a schema and optionally a batch_config:

  • schema: This is the schema of the push source, i.e. the schema of the events that will be pushed to Tecton, and to the online store. We will use this schema to validate the JSON users send to Tecton. The types supported by the API are the same ones supported by the Feature Server.
  • batch_config: The configuration for a batch source that backs the push source. Must contain at least all the columns defined in the schema. You can use post_processor to match values to the push source schema. The batch source defined by the batch_config contains the stream's historical data. It will be used to backfill feature data into the online store. It will also be used to materialize feature data into the offline store for historical feature retrieval.

The following example declares a PushSource object.

from tecton import PushSource, HiveConfig
from tecton.types import String, Int64, Timestamp, Field

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]


def post_processor_batch(df):
from pyspark.sql.functions import col

df = df.select(
col("user_id").cast("string").alias("user_id"),
col("timestamp").cast("timestamp").alias("timestamp"),
col("clicked").cast("long").alias("clicked"),
)
return df


impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
batch_config=HiveConfig(
database="demo_ads",
table="impressions_batch",
post_processor=post_processor_batch,
timestamp_field="timestamp",
),
description="Sample Push Source for ad impression events",
owner="emma@tecton.ai",
tags={"release": "production"},
)

You can then create a StreamFeatureView with the PushSource you created.

from datetime import datetime, timedelta
from tecton import StreamFeatureView, FilteredSource, BatchTriggerType

click_push_fv = StreamFeatureView(
name="click_events_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
tags={"release": "production"},
owner="emma@tecton.ai",
description="The count of ad clicks for a user",
batch_trigger=BatchTriggerType.MANUAL,
)
  • When parameter online is set to False, events sent to the Stream Ingest API won't be written to the Online Store, but will still be validated.

Once applied successfully, this Push Source and Stream Feature View is ready for Streaming Ingest. Please note, there may be a few seconds delay after the tecton apply before the API can accept records for the Push Source.

Transformations​

The Stream Ingest API supports running python/pandas based transformations, at request time and during backfill materialization. The API supports row-based transformations on the input records. Filtering records, or creating additional records, are not supported.

Transform support in the Stream Ingest API requires using a beta version tecton SDK. This can be installed as follows:

pip install -U "tecton>=0.7b"

Stream feature views with transformations can be specified as follows:

from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64

output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]


@stream_feature_view(
name="user_clicks_fv_with_python_udf",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
tags={"release": "production"},
owner="achal@tecton.ai",
mode="python",
schema=output_schema,
batch_trigger=BatchTriggerType.MANUAL,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source

As compared to Stream Feature Views without Push Sources, Stream Feature Views with Push Sources are required to contain the following additional fields:

  • mode: This is the mode of the transformation. For Stream Feature Views with Push Sources, this must be python or pandas. python transformations are applied on a single record at a time, while pandas transformations may be applied on one or many records, depending on how many records are ingested in a simultaneous request.
  • schema: The schema of the output records generated by the Feature View. Tecton validates that the output records of the transformations match this expected schema.

Time-Windowed Aggregations​

Tecton will store all ingested events in the online store and run the full aggregation at request time. You can find the full list of the aggregations supported by Tecton here.

from datetime import datetime, timedelta
from tecton import StreamFeatureView, FilteredSource, Aggregation

click_count_push_fv = StreamFeatureView(
name="click_count_push_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but won’t write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=12)),
Aggregation(column="clicked", function="sum", time_window=timedelta(hours=24)),
],
tags={"release": "production"},
owner="emma@tecton.ai",
description="The aggregated count of ad clicks for a user",
)

Time-Windowed Aggregations and Transformations​

Time-Windowed aggregations and transformations can be used together. In this case, the transformations are applied first, and then the aggregations are applied on top of the transformed values.

from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, Aggregation, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64

output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]


@stream_feature_view(
name="click_count_push_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but won’t write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=12)),
Aggregation(column="clicked_squared", function="sum", time_window=timedelta(hours=24)),
],
tags={"release": "production"},
owner="emma@tecton.ai",
description="The aggregated count of ad clicks for a user",
mode="python",
schema=output_schema,
batch_trigger=BatchTriggerType.MANUAL,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source

When using both transformations and Time-Windowed Aggregations, the schema parameter refers to the output of the records after transformation. Tecton computes the schema of the post-aggregation records automatically.

Using the low-latency REST API to ingest events into Tecton​

Authenticating with an API Key​

To authenticate your requests to the Stream Ingest API, you will need to create a Service Account, and grant that Service Account the Editor role for your workspace:

  1. Create the Service Account to obtain your API key.

    tecton service-account create \
    --name "stream-ingest-service-acount" \
    --description "A Stream Ingest API sample"

    Output:

    Save this API Key - you will not be able to get it again.
    API Key: <Your-api-key>
    Service Account ID: <Your-Service-Account-Id>
  2. Assign the Editor role to that Service Account.

    tecton access-control assign-role --role editor \
    --workspace <Your-workspace> \
    --service-account <Your-Service-Account-Id>

    Output:

    Successfully updated role.
  3. Export the API key as an environment variable named TECTON_API_KEY or add the key to your secret manager.

    export TECTON_API_KEY="my-key-code"

Making an HTTP API Call​

Note: The following example uses cURL as the HTTP client and can be executed from the command line, but the HTTP call is the same for any client.

To ingest events into the Tecton Online Store, you can use the https://preview.<your_cluster>.tecton.ai/ingest endpoint.

Sample Request

$ curl -X POST https://preview.<your_cluster>.tecton.ai/ingest\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"workspace_name": "ingestapi-live",
"dry_run": false,
"records": {
"click_event_source": [
{
"record": {
"timestamp": "2023-01-04T00:25:06Z",
"content_keyword": "apple",
"clicked": 1
}
},
{
"record": {
"timestamp": "2022-12-29T00:25:06Z",
"content_keyword": "kiwi",
"clicked": 3
}
}
],
"user_click_event_source": [
{
"record": {
"timestamp": "2023-01-02T00:25:06Z",
"user_id": "C18947Z5",
"clicked": 2
}
},
{
"record": {
"timestamp": "2022-12-29T00:25:06Z",
"user_id": "C98FG569",
"clicked": 4
}
}
]
}
}'
  • workspace_name: name of the workspace where the PushSource(s) are defined
  • dry_run (Optional): When set to True, the request will be validated but no events will be written to the Online Store. This parameter can be used for debugging and/or verifying the correctness of the request
  • records: Contains a map of PushSource name to an array of records to be ingested. Note that a single Stream Ingest API request can support batch of records for multiple sources.

When the Stream Ingest API receives the above request, it performs validations (see below for more information) and, on success, writes the batch of records for each Feature View subscribed to the corresponding PushSource.

Sample Success Response

{
"workspaceName": "ingestapi-live",
"ingestMetrics": {
"featureViewIngestMetrics": [
{
"featureViewName": "keyword_push_fv",
"onlineRecordIngestCount": "2"
},
{
"featureViewName": "user_push_fv",
"onlineRecordIngestCount": "2"
}
]
}
}

A successful response from the Stream Ingest API includes metrics on the number of records ingested per Feature View.

Sample Error Response

An error response from Stream Ingest API could contain a RequestError or a RecordError depending on the type of error encountered. Request Errors include errors such as a missing required field in the request, incorrectly formatted request, invalid workspace name etc

{
"requestError": {
"errorMessage": "Required field `workspace_name` is missing in the request",
"errorType": "RequestValidationError"
}
}

Record Errors include any error encountered when processing the batch of records in the request. Some examples are missing event timestamp column, invalid timestamp format, event timestamp outside of the serving TTL for the Feature View etc

{
"workspaceName": "ingestapi-live",
"recordErrors": [{
"featureViewName": "keyword_ingest_fv",
"pushSourceName": "click_event_source",
"errorType": "TectonObjectValidationError",
"errorMessage": "timestamp value is outside of the serving TTL defined for Feature View keyword_push_fv"
}]
}

Stream Ingest API Validations

Note: When dry_run=True, the request payload will be validated but the events will not actually be ingested to the Online Store.

  • If dry_run is set to False (or the parameter is absent in the request), then the workspace must be a live workspace with atleast one Stream Feature View subscribed to the corresponding PushSource in the request and has online write enabled (online=True)
  • The schema of each record in the request must match the schema of the corresponding PushSource, as defined during tecton apply
  • The event timestamp must be within the serving TTL defined for the Stream Feature View
  • Additionally, fields with type Timestamp in the PushSource schema must have the timestamp provided formatted as a RFC 3339 formatted string in the JSON payload. If using Java, the DateTimeFormatter.html#ISO_OFFSET_DATE_TIME formatter produces a correctly formatted string from a DateTime instance. For example,
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
...
ZonedDateTime dateTime = ZonedDateTime.now();
System.out.println(dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));

If using Python, the datetime.isoformat() produces a correctly formatted string from a datetime instance For example,

from datetime import datetime, timezone

now = datetime.now(timezone.utc)
print(now.isoformat())

Here is a list of the possible HTTP response codes:

Status CodeReason
200 OKSuccess. Features have been ingested into the online store.
400 Bad RequestIncorrectly formatted request body (JSON syntax errors)
Missing required fields (push_source_name, push_record_map, workspace_name) in the request
Schema mismatch between pushed record and the schema declared in the Push Source definition.
401 UnauthorizedMissing or invalid Tecton-key authorization header.
404 Not Foundworkspace, push source or feature views subscribed to the push source not found
500 Internal Server Errore.g. Internal Server Error. Please contact Tecton Support for assistance

Offline Logging via Stream Ingest API​

Tecton also supports logging data to the offline store via the Stream Ingest API. When a Stream Feature View with a Push Source has offline materialization enabled (using offline=True), any data sent to the Stream Ingest API is logged to the offline store after it completes validations and, if enabled, online ingestion.

Creating PushSource and StreamFeatureView for Offline Logging​

In addition to offline logging via Stream Ingest API, you can either configure backfilling via a batch source as described in the above example or skip backfilling via batch source completely. Below is an example of a PushSource without a batch_config parameter.

from tecton import Field, PushSource
from tecton.types import String, Int64, Timestamp

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
description="Sample Push Source for ad impression events",
owner="pooja@tecton.ai",
tags={"release": "production"},
)

Here is a Stream Feature View that subscribes to the above PushSource:

from datetime import datetime, timedelta
from tecton import StreamFeatureView

click_push_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2023, 1, 1),
ttl=timedelta(days=7),
tags={"release": "production"},
owner="pooja@tecton.ai",
description="The count of ad clicks for a user",
)

Any data sent to the Stream Ingest API for PushSource impressions_event_source will be written to both online and offline store for the Feature View click_events_fv. Since the Feature View's Data Source has no batch_config configured, no materialization jobs will be scheduled for backfilling.

  • Parameters batch_schedule and batch_trigger_type cannot be configured when the StreamFeatureView's PushSource has no batch_config
  • If both online and offline writes are enabled for the Feature View, the Stream Ingest API will perform validations and write to online store first, before logging the data to the offline store.
  • If only offline write is enabled, Stream Ingest API will perform validations and log the data to the offline store.