Skip to main content
Version: Beta 🚧

How To Guide: Connect Kinesis to Rift Streaming

Create a simple Stream Feature View with Rift that processes events from a Kinesis Stream.

Introduction​

This guide assumes some basic familiarity with Tecton, including how to create and apply Features. If you are new to Tecton, we recommend first checking out our Tutorials which cover a range of topics designed to bring you up to speed working with Tecton.

While following this guide, you will:

  • Create an AWS Kinesis event stream
  • Create a Tecton StreamSource with a PushConfig
  • Create a StreamFeatureView that counts events on the StreamSource
  • Create a simple AWS Lambda function that is triggered by the Kinesis event stream we created earlier. It acts as the binding connector that consumes events from Kinesis and pushes them to Tecton

kinesis-stream-diagram

Prerequisites​

  1. A production Tecton workspace (this guide will not work with a lab workspace)
  2. An AWS account
  3. Familiarity with: AWS Lambda, Kinesis, Cloudshell

Step 1: Create a Kinesis Event Stream​

Head to the Amazon Kinesis page in your AWS Console and create a simple stream. For this example, call it tecton-test-stream

You can accept all the defaults and create it:

create-data-stream

Note the ARN of the stream - you will need it later.

Step 2: Create a Minimal StreamSource and StreamFeatureView​

Once you have logged in to your live Tecton workspace, you can apply the following feature repository. It creates a StreamSource and StreamFeatureView.

Note that you bootstrap the StreamSource's batch data source with mocked events. In a production setting, you would point Tecton at a data warehouse table or data lake table, which represents a historical log of events that will be used for backfill operations.

from tecton import (
StreamSource,
FeatureService,
pandas_batch_config,
StreamFeatureView,
)
from datetime import timedelta
from tecton import PushConfig, StreamSource
from tecton.types import String, Int64, Timestamp, Field
from tecton import Aggregate, Entity
from tecton.types import Field, Int32
from datetime import datetime, timedelta

# Create a mock batch data source
@pandas_batch_config()
def mock_batch_source():
import pandas as pd

# Mock data
events = [
("click", 1, "2024-11-02 12:49:00"),
("purchase", 2, "2024-11-02 12:49:00"),
("view", 3, "2024-11-02 12:49:00"),
]

cols = ["event_type", "user_id", "timestamp_str"]

df = pd.DataFrame(events, columns=cols)
df["timestamp"] = pd.to_datetime(df["timestamp_str"])

return df


# Create a minimal Stream Source
push_stream_source = StreamSource(
name="push_stream_source",
schema=[
Field(name="event_type", dtype=String),
Field(name="user_id", dtype=Int64),
Field(name="timestamp", dtype=Timestamp),
],
stream_config=PushConfig(),
batch_config=mock_batch_source,
)

user = Entity(name="user_entity", join_keys=[Field("user_id", Int32)])

# Create a minimal SFV that counts the number of events on the stream source
demo_sfv = StreamFeatureView(
name="demo_sfv",
source=push_stream_source.unfiltered(),
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
timestamp_field="timestamp",
features=[
Aggregate(input_column=Field("event_type", String), function="count", time_window=timedelta(minutes=1)),
Aggregate(input_column=Field("event_type", String), function="count", time_window=timedelta(minutes=2)),
Aggregate(input_column=Field("event_type", String), function="count", time_window=timedelta(minutes=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2024, 5, 1),
)

demo_fs = FeatureService(
name="demo_fs",
features=[demo_sfv],
)

Step 3: Connect Kinesis to your Tecton StreamSource using AWS Lambda​

3a - Create your AWS Lambda​

Head to the Lambda page in your AWS Console and create a new Lambda function. Call it something like kinesis-to-tecton.

Change the Runtime to Python 3.13 and then create the function.

create-lambda-function

3b - Give your Lambda the permission to read from Kinesis​

By default, your Lambda function will not be able to read from the Kinesis stream you created earlier.

To change that, head to IAM in your AWS console, find your Lambda function's automatically created role (named something like kinesis-to-tecton-role-cpuma8bj which would have a url similar to: this), and apply the following inline policy to the IAM role of your Lambda function. Name it something like kinesis-to-tecton-lambda-policy. Make sure you adjust the ARN:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:us-west-2:1234567890:stream/tecton-test-stream"
}
]
}

3c - Configure your Lambda Function to be Triggered by your Kinesis Stream​

Navigate to the configuration of your Lambda function and add a trigger:

add-a-trigger

Select your previously created Kinesis Stream.

Set the batch size to 1 (or whatever size makes most sense for your use case):

add-trigger-to-stream

3d - Modify your Lambda Code to Call Tecton​

Here's a simple Lambda Python script that will decode Kinesis events and push them to a StreamSource in Tecton. Adjust the CONSTANTTs at the top of the script to make it work for your StreamSource. In a production setting, you likely would not want to hardcode those variables, but rather inject them via Lambda environment variables.

import json
import pprint
import base64
import urllib3


# Constants
CLUSTER_NAME = "[Your-Cluster]"
API_KEY = "[Your-Service-Account-API-Key]"
WORKSPACE_NAME = "[Your-Workspace]"
STREAM_SOURCE = "[Your-Stream-Source]" # Name must match the StreamSource you created earlier
DRY_RUN = False

http = urllib3.PoolManager()


def send_events_to_tecton(events):
"""
Send a POST request to Tecton's ingest endpoint to submit feature data records.

Args:
events (list): List of events to be sent to Tecton.

Returns:
dict: Response from the Tecton ingest API.
"""
records = {STREAM_SOURCE: []}
for e in events:
records[STREAM_SOURCE].append({"record": e})

url = f"https://preview.{CLUSTER_NAME}.tecton.ai/v2/ingest"
headers = {"Authorization": f"Tecton-key {API_KEY}", "Content-Type": "application/json"}
payload = json.dumps({"workspaceName": WORKSPACE_NAME, "dryRun": DRY_RUN, "records": records})

response = http.request("POST", url, body=payload, headers=headers)

# Check if the request was successful
if response.status == 200:
result = json.loads(response.data.decode("utf-8"))
pprint.pprint(result)
return result
else:
print(response.data.decode("utf-8"))
raise Exception(f"Request failed with status: {response.status}")


def extract_events_from_kinesis_triggered_event(event):
# Check if 'Records' field is in the event
if "Records" not in event:
raise ValueError("No 'Records' field found in the event")

decoded_events = []

# Loop through each record in the event
for record in event["Records"]:
# Check if 'kinesis' field is present in the record
if "kinesis" not in record:
raise ValueError("No 'kinesis' field found in record")

# Check if 'data' field is present in the kinesis payload
if "data" not in record["kinesis"]:
raise ValueError("No 'data' field found in kinesis record")

# Decode the Kinesis data (it's base64 encoded)
kinesis_data = record["kinesis"]["data"]
decoded_data = json.loads(base64.b64decode(kinesis_data).decode("utf-8"))

# Print the decoded data
decoded_events.append(decoded_data)

return decoded_events


def lambda_handler(event, context):
extracted_events = extract_events_from_kinesis_triggered_event(event)
result = send_events_to_tecton(extracted_events)

# Return a success status
return {"statusCode": 200, "body": json.dumps("Processed Kinesis records")}

3e - Test your lambda​

You can invoke your lambda with the following sample payload. It follows the Kinesis spec.

It has a sample payload b64 encoded in the "data" field, which follows the schema of the stream source we defined earlier:

{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "partitionKey-123",
"sequenceNumber": "123456789",
"data": "eyJ0aW1lc3RhbXAiOiAiMjAyNC0xMS0xNFQxMToyMjowMy41MjAlcl9pZCI6ICIxMjMiLCAiZXZlbnRfdHlwZSI6ICJteV90ZXN0In0=",
"approximateArrivalTimestamp": 1731614529.214373
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:1234567890",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
}
]
}

If all goes well you should see an output like this:

{
"statusCode": 200,
"body": "\"Processed Kinesis records\""
}

Step 4 - Test the System End to End​

To test that everything works as expected, ingest a few events into Kinesis and query Tecton for the results.

4a - Ingest Events into Kinesis​

You can use the following script and execute it straight in AWS CloudShell to inject a few sample events into Kinesis (use nano test.py in CloudShell to create a new test.py file). Make sure you replace the ARN of the Kinesis Stream:

import boto3
import json
from botocore.exceptions import ClientError
import time
from datetime import datetime

# Stream ARN provided
STREAM_ARN = "arn:aws:kinesis:us-west-2:1234567890:stream/tecton-test-stream"

# Initialize the Kinesis client with the correct region
kinesis_client = boto3.client("kinesis", region_name="us-west-2")

# Function to get the stream name from ARN
def get_stream_name_from_arn(arn):
return arn.split("/")[-1] # Extracts only the stream name


# Stream name extracted from ARN
stream_name = get_stream_name_from_arn(STREAM_ARN)

# Function to send JSON events to Kinesis stream
def send_event_to_kinesis(event):
try:
response = kinesis_client.put_record(
StreamName=stream_name, Data=json.dumps(event), PartitionKey="partition_key" # Use a relevant partition key
)
print(f"Sent event to Kinesis: {response}")
except ClientError as e:
print(f"Failed to send event: {e}")


send_event_to_kinesis({"timestamp": datetime.now().isoformat(), "user_id": "123", "event_type": "my_test"})

Execute the Python file using python test.py and you should see something like this:

Sent event to Kinesis: {'ShardId': 'shardId-000000000001', 'SequenceNumber': '49657719691471505774136506002952224238102178837706047506', 'ResponseMetadata': {'RequestId': 'f1c00486-6ed0-a0c1-ae70-8e2d5f2c1123', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f1c00486-6ed0-a0c1-ae70-8e2d5f2c1123', 'x-amz-id-2': 'n4A81osJI08qDru94nAMLNq4+NoJ4axeiiKTH+pSuWjMC3OcQxZ4ee3uW1X6ydpzQC4MU2mgx+RrInekJrw8u0lluCqN9SuC8nYtqJEXuEc=', 'date': 'Thu, 14 Nov 2024 23:31:55 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}

4b - Query Tecton’s Feature Service after you’ve Injected New Events​

Here's a sample Python script you can execute to query the FeatureService you created earlier.

Make sure you adjust the url, production workspace name, and api_key.

Also make sure you pip install tecton-client:

from tecton_client import TectonClient
import pprint

url = "https://[your_cluster].tecton.ai/"
workspace = "[your-workspace]"
api_key = "[your-api-key]"
fs_name = "demo_fs"
join_key_map = {"user_id": "123"}

client = TectonClient(url=url, default_workspace_name=workspace, api_key=api_key)

resp = client.get_features(feature_service_name=fs_name, join_key_map=join_key_map)

pprint.pprint(resp.get_features_dict())

When running the command, you should see an output like this:

{'demo_sfv.event_type_count_1m_continuous': 1,
'demo_sfv.event_type_count_2m_continuous': 1,
'demo_sfv.event_type_count_3m_continuous': 2}

And that's it -- you have created an end to end system that continuously streams events from Kinesis into a StreamSource in Tecton. You can now create as many Streaming Feature Views as you would like off of the same Kinesis stream!

Was this page helpful?