Skip to main content
Version: 1.0

Connect to a Kinesis Stream

Overview

This example explains how you can connect Tecton to a Kinesis stream.

Kinesis is an AWS global resource, which is not associated with a specific virtual private cloud (VPC). This eliminates the need to set up networking access (Security Groups, Subnets, VPC Peering, and so on). Instead, just ensure that Tecton has AWS IAM permissions to read from the Kinesis data source. Tecton's default configuration enables it to access all Kinesis streams available in the AWS account in which Tecton is deployed.

note

See Cross-Account Kinesis Access, below, for instructions on how to connect Tecton to a Kinesis stream in an AWS account that differs from the AWS account in which Tecton is deployed.

Once Tecton has access to your Kinesis streams, you need the following information:

  • region: The AWS region in which the Kinesis stream lives (for example: us-west-1, us-east-2)
  • stream_name: The unique name of the Kinesis stream

Sample Data Source Configuration for Kinesis

The following example shows how to define a Stream Data Source with a KinesisConfig.

from tecton import (
HiveConfig,
KinesisConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta


def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)

payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merch_lat", StringType(), False),
StructField("merch_long", StringType(), False),
StructField("timestamp", StringType(), False),
]
)

return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merch_lat").cast("double").alias("merch_lat"),
col("payload.merch_long").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)


partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]

batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)


transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="tecton-demo-fraud-data-stream",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(hours=24),
timestamp_field="timestamp",
post_processor=raw_data_deserialization,
options={
"roleArn": "arn:aws:iam::<kinesis-owner-acct-id>:role/tecton-demo-fraud-data-cross-account-kinesis-ro"
},
),
batch_config=batch_config,
)

transactions_batch = BatchSource(
name="transactions_batch",
batch_config=batch_config,
)

Cross-Account Kinesis Access

You might need access to a Kinesis stream that's in a different AWS account than Tecton's data plane. To enable cross-account access:

  1. Create a cross-account role in the AWS account of your Kinesis stream that allows Tecton-orchestrated Spark workers to read from your Kinesis stream
  2. Configure your KinesisConfig object to use the cross-account role by setting the roleArn parameter to the AWS ARN of the cross-account IAM role

Creating a Cross-Account Role

  1. In your Kinesis AWS Account, go to the IAM service and click the Roles tab.
  2. Click Create role. In the Select type of trusted entity panel, click Another AWS Account. Paste in the Account ID of Tecton's data plane AWS account, <deployment-acct-id>. You can get this ID by emailing support@tecton.ai.
  3. Click Next: permissions and give this role permission to access Kinesis. You can provide your own JSON or use the AmazonKinesisFullAccess policy.
  4. Click Next: Review and give the role a name, for example KinesisCrossAccountRole.
  5. Click Create role. The list of roles displays.
  6. In the Roles list, click KinesisCrossAccountRole and verify that the trusted account contains a JSON policy like:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": ["arn:aws:iam::<deployment-acct-id>:root"],
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
  1. Copy the role ARN. For example: arn:aws:iam::<kinesis-owner-acct-id>:role/KinesisCrossAccountRole.

Configuring the KinesisConfig Object

Set the roleArn value in the options argument of your KinesisConfig as shown below:

stream_config = KinesisConfig(
...,
options={"roleArn": "arn:aws:iam::<kinesis-owner-acct-id>:role/KinesisCrossAccountRole"},
)

Validate Data Access

To validate that Tecton can properly access the Kinesis stream, test the stream with the Data Source's start_stream_preview function, documented here. Use the function in an interactive notebook.

Was this page helpful?