StreamSource
Summaryโ
A Tecton StreamSource, used to unify stream and batch data into Tecton for use in a StreamFeatureView.Example
import pysparkfrom tecton import KinesisConfig, HiveConfig, StreamSourcefrom datetime import timedelta# Define our deserialization raw stream translatordef raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:from pyspark.sql.functions import col, from_json, from_utc_timestampfrom pyspark.sql.types import StructType, StringTypepayload_schema = (StructType().add('amount', StringType(), False).add('isFraud', StringType(), False).add('timestamp', StringType(), False))return (df.selectExpr('cast (data as STRING) jsonData').select(from_json('jsonData', payload_schema).alias('payload')).select(col('payload.amount').cast('long').alias('amount'),col('payload.isFraud').cast('long').alias('isFraud'),from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp')))# Declare a StreamSource with both a batch_config and a stream_config as parameters# See the API documentation for both BatchConfig and StreamConfigtransactions_stream = StreamSource(name='transactions_stream',stream_config=KinesisConfig(stream_name='transaction_events',region='us-west-2',initial_stream_position='latest',watermark_delay_threshold=timedelta(minutes=30),timestamp_field='timestamp',post_processor=raw_data_deserialization, # deserialization function defined aboveoptions={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'}),batch_config=HiveConfig(database='demo_fraud',table='transactions',timestamp_field='timestamp',),owner='user@tecton.ai',tags={'release': 'staging'})
Attributesโ
| Name | Data Type | Description |
|---|---|---|
created_at | Optional[datetime.datetime] | Returns the time that this Tecton object was created or last updated. None for locally defined objects. |
data_delay | Optional[datetime.timedelta] | Returns the duration that materialization jobs wait after the batch_schedule before starting, typically to ensure that all data has landed. |
data_source_type | data_source_type_pb2.DataSourceType.ValueType | |
defined_in | Optional[str] | The repo filename where this object was declared. None for locally defined objects. |
description | Optional[str] | Returns the description of the Tecton object. |
id | str | Returns the unique id of the Tecton object. |
info | ||
name | str | Returns the name of the Tecton object. |
owner | Optional[str] | Returns the owner of the Tecton object. |
prevent_destroy | bool | Return whether entity has prevent_destroy flagged |
tags | Dict[str, str] | Returns the tags of the Tecton object. |
workspace | Optional[str] | Returns the workspace that this Tecton object belongs to. None for locally defined objects. |
Methodsโ
| Name | Description |
|---|---|
__init__(...) | Creates a new StreamSource. |
get_dataframe(...) | Returns the data in this Data Source as a Tecton DataFrame. |
ingest(...) | Ingests a list of events, or a single event, into the Tecton Stream Ingest API. |
select_range(...) | Returns this DataSource object wrapped as a FilteredSource. FilteredSources will automatically pre-filter sources in Feature View definitions and can reduce compute costs. |
start_stream_preview(...) | Starts a streaming job to write incoming records from this DS's stream to a temporary table with a given name. |
summary() | Displays a human-readable summary. |
unfiltered() | Return an unfiltered DataSource. This scope will make an entire source available to a Feature View, but can increase compute costs as a result. |
validate() | [Deprecated in SDK 1.0] Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary. |
__init__(...)โ
Creates a new StreamSource.Parameters
name: strA unique name of the DataSource.description: Optional[str] = NoneA human-readable description.tags: Optional[Dict[str, str]] = NoneTags associated with this Tecton Object (key-value pairs of arbitrary metadata).owner: Optional[str] = NoneOwner name (typically the email of the primary maintainer).prevent_destroy: bool = FalseIf True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object,prevent_destroymust be set to False via the same tecton apply or a separate tecton apply.prevent_destroycan be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs.prevent_destroyalso blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. ifprevent_destroyis set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service.prevent_destroyis only enforced in live (i.e. non-dev) workspaces.batch_config: Optional[BatchConfigType] = NoneBatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. This field is optional only ifstream_configis a PushConfig.stream_config: StreamConfigTypeStreamConfig object containing the configuration of the Stream Data Source that backs this Tecton Stream Source.options: Optional[Dict[str, str]] = NoneAdditional options to configure the Source. Used for advanced use cases and beta features.schema: Optional[List[types.Field]] = NoneA schema for the StreamSource. If not provided, the schema will be inferred from the underlying batch source. Right now, schemas can only be specified for StreamSources with a PushConfig, and that's also why the schema must be a list of Tecton types.
get_dataframe(...)โ
Returns the data in this Data Source as a Tecton DataFrame.Parameters
start_time: Optional[datetime.datetime] = NoneThe interval start time from when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translatoris True.end_time: Optional[datetime.datetime] = NoneThe interval end time until when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translatoris True.apply_translator: bool = TrueIf True, the transformation specified bypost_processorwill be applied to the dataframe for the data source.apply_translatoris not applicable to batch sources configured withspark_batch_configbecause it does not have apost_processor.compute_mode: Optional[Union[ComputeMode, str]] = NoneCompute mode to use to produce the data frame.
Returns
TectonDataFrame: A Tecton DataFrame containing the data source's raw or translated source data.Raises
- TectonValidationError: If
apply_translatoris False, butstart_timeorend_timefilters are passed in.
ingest(...)โ
Ingests a list of events, or a single event, into the Tecton Stream Ingest API.Parameters
events: Union[Dict[str, Any], Sequence[Dict[str, Any]]]A list of dictionaries representing a sequence of events to be ingested. Also accepts a single dictionary.dry_run: bool = FalseIf True, the ingest request will be validated, but the event will not be materialized to the online store. If False, the event will be materialized.
Returns
Dict[str, Any]start_stream_preview(...)โ
Starts a streaming job to write incoming records from this DS's stream to a temporary table with a given name.ย
After records have been written to the table, they can be queried using
spark.sql(). If ran in a Databricks
notebook, Databricks will also automatically visualize the number of incoming records.ย
This is a testing method, most commonly used to verify a StreamDataSource is correctly receiving streaming events. Note that the table will grow infinitely large, so this is only really useful for debugging in notebooks.
Parameters
table_name: strThe name of the temporary table that this method will write to.apply_translator: bool = TrueWhether to apply this data source'sraw_stream_translator. When True, the translated data will be written to the table. When False, the raw, untranslated data will be written.apply_translatoris not applicable to stream sources configured withspark_stream_configbecause it does not have apost_processor.option_overrides: Optional[Dict[str, str]] = NoneA dictionary of Spark readStream options that will override any readStream options set by the data source. Can be used to configure behavior only for the preview, e.g. settingstartingOffsets:latestto preview only the most recent events in a Kafka stream.checkpoint_dir: Optional[str] = NoneA root directory where a temporary folder will be created and used by the streaming job for checkpointing. Primarily intended for use with Databricks Unity Catalog Shared Access Mode Clusters. If specified, the environment should have write permission for the specified directory. If not provided, a temporary directory will be created using the default file system.
Returns
pyspark_streaming.StreamingQueryselect_range(...)โ
Returns this DataSource object wrapped as a FilteredSource. FilteredSources will automatically pre-filter sources in Feature View definitions and can reduce compute costs.ย
Calling select_range() with no arguments returns an unfiltered source and is equivalent to source.unfiltered().
Parameters
start_time: Union[datetime.datetime, FilterDateTime, TectonTimeConstant]The start time of the filter. Can be a datetime or TectonTimeConstant optionally offset by a timedelta.end_time: Union[datetime.datetime, FilterDateTime, TectonTimeConstant]The end time of the filter. Can be a datetime or TectonTimeConstant optionally offset by a timedelta.
Returns
FilteredSource: A FilteredSource object that can be passed into a Feature View.Examples
# The following example demonstrates how to use select_range to filter a source to only include data from# 7 days before MATERIALIZATION_END_TIME.@batch_feature_view(...sources=[transactions_source.select_range(start_time=TectonTimeConstant.MATERIALIZATION_END_TIME - timedelta(days=7),end_time=TectonTimeConstant.MATERIALIZATION_END_TIME)]...)
# The following example filters all source data from 2020/1/1@batch_feature_view(...sources=[transactions_source.select_range(start_time=datetime.datetime(2020, 1, 1),end_time=TectonTimeConstant.UNBOUNDED_FUTURE)]...)
summary()โ
Displays a human-readable summary.unfiltered()โ
Return an unfiltered DataSource. This scope will make an entire source available to a Feature View, but can increase compute costs as a result.Returns
FilteredSourcevalidate()โ
Deprecation Warning
Deprecated in SDK 1.0. As of Tecton version 1.0 objects are validated upon object creation, so
validate() is unnecessary.