Using the Stream Ingest API
This feature is currently in Private Preview.
- There are no SLOs for the Stream Ingest API.
- The Stream Ingest API for Tecton on Snowflake is under development. Please reach out to Tecton Support if this is something you are interested in.
The Stream Ingest API allows for synchronous and scalable ingestion of events into Tecton's online store with milliseconds latency. The Stream Ingest API is an HTTP API, accepting JSON records, and as such has no dependency on Spark Streaming.
"API", where used below, refers to the Stream Ingest API.
The API is serverless; users do not need to be concerned with provisioning, infrastructure management, or scaling.
The API can be called directly from a microservice or from a stream processor, to ingest data into Tecton.
Data ingested though the API is written to the online store, and optionally the offline store. The online store only has the most recently pushed value, so any previous values for an entity may be overwritten.
Creating a Stream Feature View With a Push Sourceβ
Using the Stream Ingest API requires >= Tecton 0.6.
To use the API, you will need a StreamFeatureView
with a PushSource
.
In addition to the metadata needed to define Tecton objects, a PushSource
needs a schema
and optionally a batch_config
:
schema
: This is the schema of the push source, i.e. the schema of the events that will be pushed to Tecton, and to the online store. We will use this schema to validate the JSON users send to Tecton. The types supported by the API are the same ones supported by the Feature Server.batch_config
: The configuration for a batch source that backs the push source. Must contain at least all the columns defined in the schema. You can usepost_processor
to match values to the push source schema. The batch source defined by thebatch_config
contains the stream's historical data. It will be used to backfill feature data into the online store. It will also be used to materialize feature data into the offline store for historical feature retrieval.
The following example declares a PushSource
object.
from tecton import PushSource, HiveConfig
from tecton.types import String, Int64, Timestamp, Field
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
def post_processor_batch(df):
from pyspark.sql.functions import col
df = df.select(
col("user_id").cast("string").alias("user_id"),
col("timestamp").cast("timestamp").alias("timestamp"),
col("clicked").cast("long").alias("clicked"),
)
return df
impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
batch_config=HiveConfig(
database="demo_ads",
table="impressions_batch",
post_processor=post_processor_batch,
timestamp_field="timestamp",
),
description="Sample Push Source for ad impression events",
owner="emma@tecton.ai",
tags={"release": "production"},
)
You can then create a StreamFeatureView
with the PushSource
you created.
from datetime import datetime, timedelta
from tecton import StreamFeatureView, FilteredSource, BatchTriggerType
click_push_fv = StreamFeatureView(
name="click_events_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
tags={"release": "production"},
owner="emma@tecton.ai",
description="The count of ad clicks for a user",
batch_trigger=BatchTriggerType.MANUAL,
)
- When parameter
online
is set to False, events sent to the Stream Ingest API won't be written to the Online Store, but will still be validated.
Once applied successfully, this Push Source and Stream Feature View is ready for Streaming Ingest. Please note, there may be a few seconds delay after the tecton apply
before the API can accept records for the Push Source.
Transformationsβ
The Stream Ingest API supports running python/pandas based transformations, at request time and during backfill materialization. The API supports row-based transformations on the input records. Filtering records, or creating additional records, are not supported.
Transform support in the Stream Ingest API requires using a beta version tecton SDK. This can be installed as follows:
pip install -U "tecton>=0.7b"
Stream feature views with transformations can be specified as follows:
from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64
output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]
@stream_feature_view(
name="user_clicks_fv_with_python_udf",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
tags={"release": "production"},
owner="achal@tecton.ai",
mode="python",
schema=output_schema,
batch_trigger=BatchTriggerType.MANUAL,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
As compared to Stream Feature Views without Push Sources, Stream Feature Views with Push Sources are required to contain the following additional fields:
mode
: This is the mode of the transformation. For Stream Feature Views with Push Sources, this must bepython
orpandas
.python
transformations are applied on a single record at a time, whilepandas
transformations may be applied on one or many records, depending on how many records are ingested in a simultaneous request.schema
: The schema of the output records generated by the Feature View. Tecton validates that the output records of the transformations match this expected schema.
Time-Windowed Aggregationsβ
Tecton will store all ingested events in the online store and run the full aggregation at request time. You can find the full list of the aggregations supported by Tecton here.
from datetime import datetime, timedelta
from tecton import StreamFeatureView, FilteredSource, Aggregation
click_count_push_fv = StreamFeatureView(
name="click_count_push_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but wonβt write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=12)),
Aggregation(column="clicked", function="sum", time_window=timedelta(hours=24)),
],
tags={"release": "production"},
owner="emma@tecton.ai",
description="The aggregated count of ad clicks for a user",
)
Time-Windowed Aggregations and Transformationsβ
Time-Windowed aggregations and transformations can be used together. In this case, the transformations are applied first, and then the aggregations are applied on top of the transformed values.
from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, Aggregation, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64
output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]
@stream_feature_view(
name="click_count_push_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but wonβt write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=12)),
Aggregation(column="clicked_squared", function="sum", time_window=timedelta(hours=24)),
],
tags={"release": "production"},
owner="emma@tecton.ai",
description="The aggregated count of ad clicks for a user",
mode="python",
schema=output_schema,
batch_trigger=BatchTriggerType.MANUAL,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
When using both transformations and Time-Windowed Aggregations, the schema
parameter refers to the output of the records after transformation. Tecton computes the schema of the post-aggregation records automatically.
Using the low-latency REST API to ingest events into Tectonβ
Authenticating with an API Keyβ
To authenticate your requests to the Stream Ingest API, you will need to create a Service Account, and grant that Service Account the Editor role for your workspace:
Create the Service Account to obtain your API key.
tecton service-account create \
--name "stream-ingest-service-acount" \
--description "A Stream Ingest API sample"Output:
Save this API Key - you will not be able to get it again.
API Key: <Your-api-key>
Service Account ID: <Your-Service-Account-Id>Assign the Editor role to that Service Account.
tecton access-control assign-role --role editor \
--workspace <Your-workspace> \
--service-account <Your-Service-Account-Id>Output:
Successfully updated role.
Export the API key as an environment variable named
TECTON_API_KEY
or add the key to your secret manager.export TECTON_API_KEY="my-key-code"
Making an HTTP API Callβ
Note: The following example uses cURL as the HTTP client and can be executed from the command line, but the HTTP call is the same for any client.
To ingest events into the Tecton Online Store, you can use the https://preview.<your_cluster>.tecton.ai/ingest
endpoint.
Sample Request
$ curl -X POST https://preview.<your_cluster>.tecton.ai/ingest\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"workspace_name": "ingestapi-live",
"dry_run": false,
"records": {
"click_event_source": [
{
"record": {
"timestamp": "2023-01-04T00:25:06Z",
"content_keyword": "apple",
"clicked": 1
}
},
{
"record": {
"timestamp": "2022-12-29T00:25:06Z",
"content_keyword": "kiwi",
"clicked": 3
}
}
],
"user_click_event_source": [
{
"record": {
"timestamp": "2023-01-02T00:25:06Z",
"user_id": "C18947Z5",
"clicked": 2
}
},
{
"record": {
"timestamp": "2022-12-29T00:25:06Z",
"user_id": "C98FG569",
"clicked": 4
}
}
]
}
}'
workspace_name
: name of the workspace where the PushSource(s) are defineddry_run
(Optional): When set to True, the request will be validated but no events will be written to the Online Store. This parameter can be used for debugging and/or verifying the correctness of the requestrecords
: Contains a map of PushSource name to an array of records to be ingested. Note that a single Stream Ingest API request can support batch of records for multiple sources.
When the Stream Ingest API receives the above request, it performs validations (see below for more information) and, on success, writes the batch of records for each Feature View subscribed to the corresponding PushSource.
Sample Success Response
{
"workspaceName": "ingestapi-live",
"ingestMetrics": {
"featureViewIngestMetrics": [
{
"featureViewName": "keyword_push_fv",
"onlineRecordIngestCount": "2"
},
{
"featureViewName": "user_push_fv",
"onlineRecordIngestCount": "2"
}
]
}
}
A successful response from the Stream Ingest API includes metrics on the number of records ingested per Feature View.
Sample Error Response
An error response from Stream Ingest API could contain a RequestError
or a RecordError
depending on the type of error encountered.
Request Errors include errors such as a missing required field in the request, incorrectly formatted request, invalid workspace name etc
{
"requestError": {
"errorMessage": "Required field `workspace_name` is missing in the request",
"errorType": "RequestValidationError"
}
}
Record Errors include any error encountered when processing the batch of records in the request. Some examples are missing event timestamp column, invalid timestamp format, event timestamp outside of the serving TTL for the Feature View etc
{
"workspaceName": "ingestapi-live",
"recordErrors": [{
"featureViewName": "keyword_ingest_fv",
"pushSourceName": "click_event_source",
"errorType": "TectonObjectValidationError",
"errorMessage": "timestamp value is outside of the serving TTL defined for Feature View keyword_push_fv"
}]
}
Stream Ingest API Validations
Note: When dry_run=True, the request payload will be validated but the events will not actually be ingested to the Online Store.
- If
dry_run
is set to False (or the parameter is absent in the request), then the workspace must be a live workspace with atleast one Stream Feature View subscribed to the corresponding PushSource in the request and has online write enabled (online=True
) - The schema of each record in the request must match the schema of the corresponding PushSource, as defined during
tecton apply
- The event timestamp must be within the serving TTL defined for the Stream Feature View
- Additionally, fields with type
Timestamp
in the PushSource schema must have the timestamp provided formatted as a RFC 3339 formatted string in the JSON payload. If using Java, the DateTimeFormatter.html#ISO_OFFSET_DATE_TIME formatter produces a correctly formatted string from a DateTime instance. For example,
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
...
ZonedDateTime dateTime = ZonedDateTime.now();
System.out.println(dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
If using Python, the datetime.isoformat()
produces a correctly formatted string from a datetime instance
For example,
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
print(now.isoformat())
Here is a list of the possible HTTP response codes:
Status Code | Reason |
---|---|
200 OK | Success. Features have been ingested into the online store. |
400 Bad Request | Incorrectly formatted request body (JSON syntax errors) Missing required fields ( push_source_name , push_record_map , workspace_name ) in the request Schema mismatch between pushed record and the schema declared in the Push Source definition. |
401 Unauthorized | Missing or invalid Tecton-key authorization header. |
404 Not Found | workspace , push source or feature views subscribed to the push source not found |
500 Internal Server Error | e.g. Internal Server Error. Please contact Tecton Support for assistance |
Offline Logging via Stream Ingest APIβ
Tecton also supports logging data to the offline store via the Stream Ingest API. When a Stream Feature View with a Push Source has offline materialization enabled (using offline=True
), any data sent to the Stream Ingest API is logged to the offline store after it completes validations and, if enabled, online ingestion.
Creating PushSource and StreamFeatureView for Offline Loggingβ
In addition to offline logging via Stream Ingest API, you can either configure backfilling via a batch source as described in the above example or skip backfilling via batch source completely. Below is an example of a PushSource
without a batch_config
parameter.
from tecton import Field, PushSource
from tecton.types import String, Int64, Timestamp
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
description="Sample Push Source for ad impression events",
owner="pooja@tecton.ai",
tags={"release": "production"},
)
Here is a Stream Feature View that subscribes to the above PushSource:
from datetime import datetime, timedelta
from tecton import StreamFeatureView
click_push_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2023, 1, 1),
ttl=timedelta(days=7),
tags={"release": "production"},
owner="pooja@tecton.ai",
description="The count of ad clicks for a user",
)
Any data sent to the Stream Ingest API for PushSource impressions_event_source
will be written to both online and offline store for the Feature View click_events_fv
. Since the Feature View's Data Source has no batch_config
configured, no materialization jobs will be scheduled for backfilling.
- Parameters
batch_schedule
andbatch_trigger_type
cannot be configured when the StreamFeatureView's PushSource has nobatch_config
- If both online and offline writes are enabled for the Feature View, the Stream Ingest API will perform validations and write to online store first, before logging the data to the offline store.
- If only offline write is enabled, Stream Ingest API will perform validations and log the data to the offline store.