Skip to main content
Version: 1.0

StreamFeatureView

Summary​

A Tecton Stream Feature View, used for transforming and materializing features from a StreamSource.
 
The StreamFeatureView should not be instantiated directly, the @stream_feature_view decorator is recommended instead.

Examples​

Example Using Attributes​

from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import Aggregation, stream_feature_view, Attribute
from tecton.types import Int64, String, Timestamp


@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
ttl=timedelta(days=30),
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
features=[
Attribute("LAST_TRANSACTION_AMOUNT", Int64),
Attribute("LAST_TRANSACTION_CATEGORY", String),
],
timestamp_field="timestamp",
run_transformation_validation=True,
description="Features about the users most recent transaction in the past 30 days. Updated continuously.",
tecton_materialization_runtime="1.0.0",
)
def user_last_transaction_features(transactions_stream):
return f"""
SELECT
USER_ID,
TIMESTAMP,
AMOUNT as LAST_TRANSACTION_AMOUNT,
CATEGORY as LAST_TRANSACTION_CATEGORY
FROM
{transactions_stream}
"""

Example Using Aggregates​

from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import stream_feature_view, TimeWindow, Aggregate
from tecton.types import Int64, String, Timestamp, Field

@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10),
features=[
Aggregate(input_column=Field("AMOUNT", Int64), function="mean", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("AMOUNT", Int64), function="mean", time_window=timedelta(timedelta(hours=24))),
Aggregate(input_column=Field("AMOUNT", Int64), function="mean", time_window=timedelta(hours=72)),
Aggregate(input_column=Field("AMOUNT", Int64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("AMOUNT", Int64), function="sum", time_window=timedelta(hours=24)),
Aggregate(input_column=Field("AMOUNT", Int64), function="sum", time_window=timedelta(hours=72)),
],
online=True,
feature_start_time=datetime(2020, 10, 10),
timestamp_field="timestamp",
description="Transaction amount statistics and total over a series of time windows, updated every ten minutes.",
tecton_materialization_runtime="1.0.0",
)
def user_recent_transaction_aggregate_features(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

@stream_feature_view (Decorator)​

Used to instantiate a StreamFeatureView

Declare a Stream Feature View.

Parameters

  • mode (str) - (Required) Whether the annotated function is a pipeline function ("pipeline" mode) or a transformation function ("spark_sql" or "pyspark" mode). For the non-pipeline mode, an inferred transformation will also be registered.

  • source (Union[framework_data_source.StreamSource, FilteredSource]) - (Required) The data source input to the feature view.

  • entities (Sequence[framework_entity.Entity]) - (Required) The entities this feature view is associated with.

  • timestamp_field (str) - (Required) The column name that refers to the timestamp for records that are produced by the feature view. This parameter is optional if exactly one column is a Timestamp type.

  • features (Union[Sequence[feature.Aggregate], Sequence[feature.Attribute]]) - (Required) A list of features this feature view manages.

  • name (Optional[str]) - Unique, human friendly name that identifies the FeatureView. Defaults to the function name. Default: None

  • description (Optional[str]) - A human readable description. Default: None

  • owner (Optional[str]) - Owner name (typically the email of the primary maintainer). Default: None

  • tags (Optional[Dict[str, str]]) - Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). Default: None

  • prevent_destroy (bool) - If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object, prevent_destroy must be set to False via the same tecton apply or a separate tecton apply. prevent_destroy can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs. prevent_destroy also blocks changes to dependent Tecton objects that would trigger a recreation of the tagged object, e.g. if prevent_destroy is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service. prevent_destroy is only enforced in live (i.e. non-dev) workspaces. Default: false

  • aggregation_interval (Optional[datetime.timedelta]) - How frequently the feature value is updated (for example, "1h" or "6h") Default: None

  • aggregation_secondary_key (Optional[str]) - Configures secondary key aggregates using the set column. Only valid when using aggregations. Default: None

  • stream_processing_mode (Optional[StreamProcessingMode]) - Whether aggregations should be "batched" in time intervals or be updated continuously. Continuously aggregated features are fresher but more expensive. One of StreamProcessingMode.TIME_INTERVAL or StreamProcessingMode.CONTINUOUS. Default: StreamProcessingMode.TIME_INTERVAL

  • online (bool) - Whether the feature view should be materialized to the online feature store. Default: false

  • offline (bool) - Whether the feature view should be materialized to the offline feature store. Default: false

  • ttl (Optional[datetime.timedelta]) - The TTL (or "look back window") for features defined by this feature view. This parameter determines how long features will live in the online store and how far to "look back" relative to a training example's timestamp when generating offline training sets. Shorter TTLs improve performance and reduce costs. Default: None

  • feature_start_time (Optional[datetime.datetime]) - When materialization for this feature view should start from. (Required if offline=true or online=true) Default: None

  • lifetime_start_time (Optional[datetime.datetime]) - The start time for what data should be included in a lifetime aggregate. (Required if using lifetime windows) Default: None

  • manual_trigger_backfill_end_time (Optional[datetime.datetime]) - If set, Tecton will schedule backfill materialization jobs for this feature view up to this time. Materialization jobs after this point must be triggered manually. (This param is only valid to set if BatchTriggerType is MANUAL.) Default: None

  • batch_trigger (BatchTriggerType) - Defines the mechanism for initiating batch materialization jobs. One of BatchTriggerType.SCHEDULED or BatchTriggerType.MANUAL. The default value is BatchTriggerType.SCHEDULED, where Tecton will run materialization jobs based on the schedule defined by the batch_schedule parameter. If set to BatchTriggerType.MANUAL, then batch materialization jobs must be explicitly initiated by the user through either the Tecton SDK or Airflow operator. Default: None

  • batch_schedule (Optional[datetime.timedelta]) - The interval at which batch materialization should be scheduled. Default: None

  • online_serving_index (Optional[Sequence[str]]) - (Advanced) Defines the set of join keys that will be indexed and queryable during online serving. Default: None

  • batch_compute (Optional[configs.ComputeConfigTypes]) - Batch materialization cluster configuration. Default: None

  • stream_compute (Optional[configs.ComputeConfigTypes]) - Streaming materialization cluster configuration. Default: None

  • offline_store (Optional[Union[configs.OfflineStoreConfig, configs.ParquetConfig, configs.DeltaConfig]]) - Configuration for how data is written to the offline feature store. Default: None

  • online_store (Optional[configs.OnlineStoreTypes]) - Configuration for how data is written to the online feature store. Default: None

  • monitor_freshness (bool) - If true, enables monitoring when feature data is materialized to the online feature store. Default: false

  • data_quality_enabled (Optional[bool]) - If false, disables data quality metric computation and data quality dashboard. Default: None

  • skip_default_expectations (Optional[bool]) - If true, skips validating default expectations on the feature data. Default: None

  • expected_feature_freshness (Optional[datetime.timedelta]) - Threshold used to determine if recently materialized feature data is stale. Data is stale if now - most_recent_feature_value_timestamp > expected_feature_freshness. For feature views using Tecton aggregations, data is stale if now - round_up_to_aggregation_interval(most_recent_feature_value_timestamp) > expected_feature_freshness. Where round_up_to_aggregation_interval() rounds up the feature timestamp to the end of the aggregation_interval. Value must be at least 2 times aggregation_interval. If not specified, a value determined by the Tecton backend is used. Default: None

  • alert_email (Optional[str]) - Email that alerts for this FeatureView will be sent to. Default: None

  • max_backfill_interval (Optional[datetime.timedelta]) - (Advanced) The time interval for which each backfill job will run to materialize feature data. This affects the number of backfill jobs that will run, which is (<feature registration time> - feature_start_time) / max_backfill_interval. Configuring the max_backfill_interval parameter appropriately will help to optimize large backfill jobs. If this parameter is not specified, then 10 backfill jobs will run (the default). Default: None

  • max_batch_aggregation_interval (Optional[datetime.timedelta]) - Deprecated. Use max_backfill_interval instead, which has the exact same usage. Default: None

  • output_stream (Optional[configs.OutputStream]) - Configuration for a stream to write feature outputs to, specified as a tecton.framework.configs.KinesisOutputStream or tecton.framework.configs.KafkaOutputStream. Default: None

  • options (Optional[Dict[str, str]]) - Additional options to configure the Feature View. Used for advanced use cases and beta features. Default: None

  • run_transformation_validation (Optional[bool]) - If True, Tecton will execute the Feature View transformations during tecton plan/apply validation. If False, then Tecton will not execute the transformations during validation and schema must be set. Skipping query validation can be useful to speed up tecton plan/apply or for Feature Views that have issues with Tecton's validation (e.g. some pip dependencies). Default is True for Spark and Snowflake Feature Views and False for Python and Pandas Feature Views or Feature Views with Push Sources. Default: None

  • tecton_materialization_runtime (Optional[str]) - Version of tecton package used by your job cluster. Default: None

  • cache_config (Optional[configs.CacheConfig]) - Cache config for the Feature View. Including this option enables the feature server to use the cache when retrieving features for this feature view. Will only be respected if the feature service containing this feature view has enable_online_caching set to True. Default: None

  • stream_compaction_enabled (Optional[bool]) - Deprecated: Please use stream_tiling_enabled instead which has the exact same usage. Default: None

  • batch_compaction_enabled (Optional[bool]) - Deprecated: Please use compaction_enabled instead which has the exact same usage. Default: None

  • compaction_enabled (Optional[bool]) - (Private preview) If True, Tecton will run a compaction job after each batch materialization job to write to the online store. This requires the use of Dynamo and uses the ImportTable API. Because each batch job overwrites the online store, a larger compute cluster may be required. This is required to be True if stream_compaction_enabled is True. defaults to False Default: False

  • stream_tiling_enabled (Optional[bool]) - (Private preview) If False, Tecton transforms and writes all events from the stream to the online store (same as stream_processing_mode=StreamProcessingMode.CONTINUOUS) . If True, Tecton will store the partial aggregations of the events in the online store. Default: False

  • environment (Optional[str]) - The custom environment in which materialization jobs will be run. Defaults to None, which means jobs will execute in the default Tecton environment. Default: None

  • context_parameter_name (Optional[str]) - Name of the function parameter that Tecton injects MaterializationContext object to. Default: None

  • aggregation_leading_edge (Optional[AggregationLeadingEdge]) - (Advanced) Specifies the timestamp used for the leading edge of aggregation time windows. This parameter only affects online serving. See the AggregationLeadingEdge class documentation or the Tecton docs for more information. Defaults to AggregationLeadingEdge.WALL_CLOCK_TIME. Default: None

Returns

An object of type StreamFeatureView.

StreamFeatureView (Class)​

Attributes​

NameData TypeDescription
aggregation_intervalOptional[datetime.timedelta]How frequently the feature values are updated
aggregation_secondary_keyOptional[str]Configures secondary key aggregates using the set column.
aggregationsList[configs.Aggregation]List of Aggregation configs used by this Feature View.
alert_emailOptional[str]Email that alerts for this FeatureView will be sent to.
batch_scheduleOptional[datetime.timedelta]The batch schedule of this Feature View.
batch_triggerBatchTriggerTypeThe BatchTriggerType for this FeatureView.
cache_configOptional[configs.CacheConfig]Uses cache for Feature View if online caching is enabled.
compaction_enabledbool(Private preview) Runs compaction job post-materialization; requires Dynamo and ImportTable API.
context_parameter_nameOptional[str]Name of the function parameter that Tecton injects MaterializationContext object to.
created_atOptional[datetime.datetime]Returns the time that this Tecton object was created or last updated. None for locally defined objects.
defined_inOptional[str]The repo filename where this object was declared. None for locally defined objects.
descriptionOptional[str]Returns the description of the Tecton object.
entities
environmentOptional[str]The custom environment in which materialization jobs will be run. Defaults to None, which means jobs will execute in the default Tecton environment.
expected_feature_freshnessOptional[pendulum.Duration]Threshold used to determine if recently materialized feature data is stale.
feature_metadataList[FeatureMetadata]
feature_start_timeOptional[datetime.datetime]
idstrReturns the unique id of the Tecton object.
incremental_backfillsboolBackfills incrementally from feature_start_time to current time, one interval at a time
info
is_batch_trigger_manualboolWhether this Feature View's batch trigger is BatchTriggerType.Manual.
join_keysList[str]The join key column names.
manual_trigger_backfill_end_timeOptional[pendulum.datetime]If set, Tecton will schedule backfill materialization jobs for this Feature View up to this time.
max_backfill_intervalOptional[pendulum.Duration](Advanced) The time interval for which each backfill job will run to materialize feature data. This affects the number of backfill jobs that will run, which is (<feature registration time> - feature_start_time) / max_backfill_interval. Configuring the max_backfill_interval parameter appropriately will help to optimize large backfill jobs. If this parameter is not specified, then 10 backfill jobs will run (the default).
max_source_data_delaydatetime.timedeltaReturns the maximum data delay of input sources for this feature view.
monitor_freshnessboolIf true, enables monitoring when feature data is materialized to the online feature store.
namestrReturns the name of the Tecton object.
offlineboolWhether the Feature View is materialized to the offline feature store.
offline_storeOptional[Union[configs.DeltaConfig, configs.ParquetConfig]]Configuration for the Offline Store of this Feature View.
onlineboolWhether the Feature View is materialized to the online feature store.
online_serving_indexList[str]The set of join keys that will be indexed and queryable during online serving.
 
defaults to the complete set of join keys.
output_streamConfiguration for a stream to write feature outputs to, specified as a tecton.framework.configs.KinesisOutputStream or tecton.framework.configs.KafkaOutputStream.
ownerOptional[str]Returns the owner of the Tecton object.
prevent_destroyboolIf set to True, Tecton will block destructive actions taken on this Feature View or Feature Table.
published_features_pathOptional[str]The location of published features in the offline store.
sources
stream_tiling_enabledbool(Private preview) If False, Tecton transforms and writes all events from the stream to the online store (same as stream_processing_mode=StreamProcessingMode.CONTINUOUS) . If True, Tecton will store the partial aggregations of the events in the online store. defaults to False.
tagsDict[str, str]Returns the tags of the Tecton object.
tecton_materialization_runtimeOptional[str]Version of tecton package used by your job cluster.
timestamp_fieldOptional[str]timestamp column name for records from the feature view.
transformations
ttlDurationThe TTL (or look back window) for features defined by this Feature View. This parameter determines how long features will live in the online store and how far to look back relative to a training example's timestamp when generating offline training sets.
urlstrReturns a link to the Tecton Web UI.
wildcard_join_keyOptional[set]Returns a wildcard join key column name if it exists; Otherwise returns None.
workspaceOptional[str]Returns the workspace that this Tecton object belongs to. None for locally defined objects.

Methods​

NameDescription
cancel_materialization_job(...)Cancels the scheduled or running job by the job identifier.
delete_keys(...)Deletes any materialized data that matches the specified join keys from the FeatureView.
get_feature_columns()Retrieves the list of feature columns produced by this FeatureView.
get_features_for_events(...)Returns a TectonDataFrame of historical values for this feature view.
get_features_in_range(...)Returns a TectonDataFrame of historical values for this feature view which were valid within the input time range.
get_historical_features(...)Returns a TectonDataFrame of historical values for this feature view.
get_job(...)Retrieves data about the specified job (materialization or dataset generation).
get_materialization_job(...)Retrieves data about the specified materialization job.
get_online_features(...)Returns a single Tecton tecton.FeatureVector from the Online Store.
get_partial_aggregates(...)Returns the partially aggregated tiles in between start_time and end_time for a Feature View that uses the Tecton Aggregation Engine.
get_timestamp_field()Returns the name of the timestamp field for this Feature View.
list_jobs()Retrieves the list of all jobs (materialization and dataset generation) for this Feature View or Feature Table.
list_materialization_jobs()Retrieves the list of all materialization jobs for this feature view.
materialization_status(...)Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures.
print_transformation_schema()Prints the schema of the output of the transformation.
run(...)Run the FeatureView. Supports transforming data directly from raw data sources or using mock data.
run_stream(...)Starts a streaming job to keep writing the output records of this FeatureView to a temporary table.
run_transformation(...)Run the FeatureView Transformation as is without any aggregations or joins. Supports transforming data directly from raw data sources or using mock data.
summary()Displays a human-readable summary.
test_run(...)Run the FeatureView using mock data sources. This requires a local spark session.
transformation_schema()Returns the schema of the output of the transformation.
trigger_materialization_job(...)Starts a batch materialization job for this Feature View.
validate()Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary.
wait_for_materialization_job(...)Blocks until the specified job has been completed.
with_join_key_map(...)Rebind join keys for a Feature View or Feature Table used in a Feature Service.
with_name(...)Rename a Feature View or Feature Table used in a Feature Service.

cancel_materialization_job(...)​

Cancels the scheduled or running job by the job identifier. Once cancelled, a job will not be retried further.
 
Job run state will be set to MANUAL_CANCELLATION_REQUESTED. Note that cancellation is asynchronous, so it may take some time for the cancellation to complete. If job run is already in MANUAL_CANCELLATION_REQUESTED or in a terminal state then it'll return the job.

Parameters

  • job_id (str) - ID string of the materialization job.

Returns

MaterializationJob: JobData object for the cancelled job.

delete_keys(...)​

Deletes any materialized data that matches the specified join keys from the FeatureView.
 
This method kicks off a job to delete the data in the offline and online stores. If a FeatureView has multiple entities, the full set of join keys must be specified. Only supports Delta as the offline store (offline_store=DeltaConfig()). Maximum 500,000 keys can be deleted per request.

Parameters

  • keys (Union[pyspark_dataframe.DataFrame, pandas.DataFrame]) - The Dataframe to be deleted. Must conform to the FeatureView join keys.

  • online (bool) - Whether or not to delete from the online store. Default: true

  • offline (bool) - Whether or not to delete from the offline store. Default: true

Returns

List[str]: List of job ids for jobs created for entity deletion.

get_feature_columns(...)​

Retrieves the list of feature columns produced by this FeatureView.

Returns

List[str]: The features produced by this FeatureView.

get_features_for_events(...)​

info

This method is functionally equivalent to get_historical_features(spine) and has been renamed in Tecton 0.8 for clarity. get_historical_features() is planned to be deprecated in a future release.

Returns a TectonDataFrame of historical values for this feature view.
 
By default (i.e. from_source=None), this method fetches feature values from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data.

Parameters

  • events (Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame, str]) - A DataFrame containing all possible join key combinations and timestamps specifying which feature values to fetch. The returned DataFrame includes rollups for all (join key, timestamp) combinations necessary to compute a complete dataset. To differentiate between event columns and feature columns, feature columns are labeled as feature_view_name.feature_name in the returned DataFrame.

  • timestamp_key (Optional[str]) - Name of the time column in the events DataFrame. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of the events DataFrame if there is only one present. If more than one time column is present in the events DataFrame, you must specify which column you would like to use. Default: None

  • from_source (Optional[bool]) - Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any Feature Views are not materialized. Defaults to None. Default: None

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Valid examples include spark and rift. Default: None

Returns

TectonDataFrame: A TectonDataFrame.

Examples​

A FeatureView fv with join key user_id.

  1. fv.get_features_for_events(events) where events=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the events DataFrame.

  2. fv.get_features_for_events(events, save_as='my_dataset) where events=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the events DataFrame. Save the DataFrame as dataset with the name `my_dataset`.

  3. fv.get_features_for_events(events, timestamp_key='date_1') where events=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the date_1 column in the events DataFrame.

get_features_in_range(...)​

Returns a TectonDataFrame of historical values for this feature view which were valid within the input time range.
 
A feature value is considered to be valid at a specific point in time if the Online Store would have returned that value if queried at that moment in time.
 
The DataFrame returned by this method contains the following:
  • Entity Join Key Columns
  • Feature Value Columns
  • The columns _valid_from and _valid_to that specify the time range for which the row of features values is valid. The time range defined by [_valid_from, _valid_to) will never intersect with any other rows for the same join keys.
  • _valid_from (Inclusive) is the timestamp from which feature values were valid and returned from the Online Feature Store for the corresponding set of join keys. _valid_from will never be less than end_time. Values for which _valid_from is equal to start_time may have been valid prior to start_time.
  • _valid_to (Exclusive) is the timestamp from which feature values are invalid and no longer returned from the Online Feature Store for the corresponding set of join keys. _valid_to will never be greater than end_time. Values for which _valid_to is equal to end_time may be valid beyond end_time.
  • By default, (i.e. from_source=None), this method fetches feature values from the Offline Store for Feature Views that have offline materialization enabled. Otherwise, this method computes feature values directly from the original data source.

Parameters

  • start_time (datetime.datetime) - The inclusive start time of the time range to compute features for.

  • end_time (datetime.datetime) - The exclusive end time of the time range to compute features for.

  • entities (Optional[Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame]]) - A DataFrame that is used to filter down feature values. If specified, this DataFrame should only contain join key columns. Default: None

  • max_lookback (Optional[datetime.timedelta]) - [Non-Aggregate Feature Views Only] A performance optimization that configures how far back before start_time to look for events in the raw data. If set, get_features_in_range() may not include all entities with valid feature values in the specified time range, but get_features_in_range() will never return invalid values. Default: None

  • from_source (Optional[bool]) - Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any Feature Views are not materialized. Default: None

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Examples include spark and rift. Default: None

Returns

TectonDataFrame: A TectonDataFrame with Feature Values for the requested time range in the format specified above.

get_historical_features(...)​

info

The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.

info

The ability to run get_historical_features as part of a unit test was added in SDK 0.7. To utilize this, provide the mocked data sources in the mock_inputs parameter in a test that is run via tecton test or pytest.

Returns a TectonDataFrame of historical values for this feature view.
 
By default (i.e. from_source=None), this method fetches feature values from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data.
 
If no arguments are passed in, all feature values for this feature view will be returned in a Tecton DataFrame.
 
Note: The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.

Parameters

  • spine (Optional[Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame, str]]) - The spine to join against, as a dataframe. If present, the returned DataFrame will contain rollups for all (join key, temporal key) combinations that are required to compute a full frame from the spine. To distinguish between spine columns and feature columns, feature columns are labeled as feature_view_name.feature_name in the returned DataFrame. If spine is not specified, it'll return a DataFrame of feature values in the specified time range. Default: None

  • timestamp_key (Optional[str]) - Name of the time column in the spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of the spine if there is only one present. If more than one time column is present in the spine, you must specify which column you'd like to use. Default: None

  • start_time (Optional[datetime.datetime]) - The interval start time from when we want to retrieve features. If no timezone is specified, will default to using UTC. Default: None

  • end_time (Optional[datetime.datetime]) - The interval end time until when we want to retrieve features. If no timezone is specified, will default to using UTC. Default: None

  • entities (Optional[Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame]]) - Filter feature data returned to a set of entity IDs. If specified, this DataFrame should only contain join key columns. Default: None

  • from_source (Optional[bool]) - Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any Feature Views are not materialized. Defaults to None. Default: None

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Valid examples include spark and rift. Default: None

Returns

TectonDataFrame: A TectonDataFrame.

Examples​

A FeatureView fv with join key user_id.

  1. fv.get_historical_features(spine) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine.

  2. fv.get_historical_features(spine, save_as='my_dataset) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine. Save the DataFrame as dataset with the name my_dataset.

  3. fv.get_historical_features(spine, timestamp_key='date_1') where spine=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the date_1 column in the spine.

  4. fv.get_historical_features(start_time=datetime(...), end_time=datetime(...)) Fetch all historical features from the offline store in the time range specified by start_time and end_time.

get_job(...)​

Retrieves data about the specified job (materialization or dataset generation).

Parameters

  • job_id (str) - ID string of the job.

Returns

TectonJob: JobData object for the job.

get_materialization_job(...)​

Retrieves data about the specified materialization job.

Parameters

  • job_id (str) - ID string of the materialization job.

Returns

MaterializationJob: MaterializationJobData object for the job.

get_online_features(...)​

Returns a single Tecton tecton.FeatureVector from the Online Store.

Parameters

  • join_keys (Mapping[str, Union[int, numpy.int_, str, bytes]]) - The join keys to fetch from the online store.

  • include_join_keys_in_response (bool) - Whether to include join keys as part of the response FeatureVector. Default: false

Returns

FeatureVector: A tecton.FeatureVector of the results.

Examples​

A FeatureView fv with join key user_id.

  1. fv.get_online_features(join_keys={'user_id': 1}) Fetch the latest features from the online store for user 1.

  2. fv.get_online_features(join_keys={'user_id': 1}, include_join_keys_in_response=True) Fetch the latest features from the online store for user 1 and include the join key information (user_id=1) in the returned FeatureVector.

get_partial_aggregates(...)​

Returns the partially aggregated tiles in between start_time and end_time for a Feature View that uses the Tecton Aggregation Engine.

Parameters

  • start_time (datetime.datetime) - The start time of the time window to fetch partial aggregates for. The start_time will fall in the time interval of the first tile that is returned.

  • end_time (datetime.datetime) - The end time of the time window to fetch partial aggregates for. The end_time will fall in the time interval of the last tile that is returned.

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For feature views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False. Default: None

  • entities (Optional[Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame]]) - A DataFrame that is used to filter down feature values. If specified, this DataFrame should only contain join key columns. Default: None

  • from_source (Optional[bool]) - Whether feature values should be recomputed from the original data source. If None, input feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any input Feature Views are not materialized. Defaults to None. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Default: None

Returns

TectonDataFrame: A Tecton DataFrame with partially aggregated feature values and the _interval_start_time and _interval_end_time columns for each partial aggregation.

get_timestamp_field()​

Returns the name of the timestamp field for this Feature View.

Returns

str

list_jobs()​

Retrieves the list of all jobs (materialization and dataset generation) for this Feature View or Feature Table.

Returns

List[TectonJob]: List of JobData objects.

list_materialization_jobs()​

Retrieves the list of all materialization jobs for this feature view.

Returns

List[MaterializationJob]: List of MaterializationJobData objects.

materialization_status()​

Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures.
 
This method returns different information depending on the type of FeatureView.

Parameters

  • verbose (bool) - If set to true, method will display additional low level materialization information, useful for debugging. Default: false

  • limit (int) - Maximum number of jobs to return. Default: 1000

  • sort_columns (Optional[str]) - A comma-separated list of column names by which to sort the rows. Default: None

  • errors_only (bool) - If set to true, method will only return jobs that failed with an error. Default: false

Prints the schema of the output of the transformation.

Returns

None

run(...)​

info

This method has been replaced by the .run_transformation() method and will be deprecated in a future release.

Run the FeatureView. Supports transforming data directly from raw data sources or using mock data.
 
To run the feature view with data from raw data sources, the environment must have access to the data sources.

Parameters

  • start_time (datetime.datetime) - The start time of the time window to materialize.

  • end_time (datetime.datetime) - The end time of the time window to materialize.

  • aggregation_level (Optional[str]) - For feature views with aggregations, aggregation_level configures which stage of the aggregation to run up to. The query for Aggregate Feature Views operates in three steps:
     
    1. The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
       
    2. The result of #1 is aggregated into tiles the size of the aggregation_interval.
       
    3. The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.
       
      For testing and debugging purposes, to see the output of #1, use aggregation_level="disabled". For #2, use aggregation_level="partial". For #3, use aggregation_level="full". aggregation_level="full" is the default behavior if aggregation_level is not explicitly specified.
    Default: None

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Default: None

  • mock_inputs_kwargs (Union[pandas.DataFrame, pyspark_dataframe.DataFrame]) - Keyword arguments for mock sources that should be used instead of fetching directly from raw data sources. The keys should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False.

Returns

TectonDataFrame: A tecton DataFrame of the results.

run_stream(...)​

Starts a streaming job to keep writing the output records of this FeatureView to a temporary table.
 
The job will be running until the execution is terminated.
 
After records have been written to the table, they can be queried using spark.sql(). If ran in a Databricks notebook, Databricks will also automatically visualize the number of incoming records.

Parameters

  • output_temp_table (str) - The name of the temporary table to write to.

  • checkpoint_dir (Optional[str]) - A root directory that the streaming job will checkpoint to. Default: None

Returns

streaming.StreamingQuery

Example​

  1. fv.run_stream(output_temp_table="temp_table") Start a streaming job.

  2. display(spark.sql("SELECT \* FROM temp_table LIMIT 5")) Query the output table, and display the output dataframe.

run_transformation(...)​

Run the FeatureView Transformation as is without any aggregations or joins. Supports transforming data directly from raw data sources or using mock data.
 
To run the feature view transformation with data from raw data sources, the environment must have access to the data sources.

Parameters

  • start_time (datetime.datetime) - The start time of the time window to materialize.

  • end_time (datetime.datetime) - The end time of the time window to materialize.

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Default: None

Returns

TectonDataFrame: A tecton DataFrame of the results.

Example​

import tecton
import pandas
from datetime import datetime

# Example running a non-aggregate Feature View with mock data.
feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_feature_view")

mock_fraud_user_data = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1, 0), datetime(2022, 5, 1, 2), datetime(2022, 5, 1, 5)],
"credit_card_number": [1000, 4000, 5000],
}
)

result = feature_view.run_transformation(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
mock_inputs={"fraud_users_batch": mock_fraud_user_data},
) # `fraud_users_batch` is the name of this Feature View's data source parameter.

# Example running an aggregate Feature View with real data.
aggregate_feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_aggregate_feature_view")

result = aggregate_feature_view.run_transformation(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2))

summary()​

Displays a human-readable summary.

test_run(...)​

Run the FeatureView using mock data sources. This requires a local spark session.
 
Unlike run, test_run is intended for unit testing. It will not make calls to your connected Tecton cluster or attempt to read real data.

Parameters

  • start_time (datetime.datetime) - The start time of the time window to materialize.

  • end_time (datetime.datetime) - The end time of the time window to materialize.

  • aggregation_level (Optional[str]) - For feature views with aggregations, aggregation_level configures what stage of the aggregation to run up to. The query for Aggregate Feature Views operates in three logical steps:
    1. The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
       
    2. The result of #1 is aggregated into tiles the size of the aggregation_interval.
       
    3. The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.
       
      For testing and debugging purposes, to see the output of #1, use aggregation_level="disabled". For #2, use
       
      aggregation_level="partial". For #3, use aggregation_level="full".
    Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Default: None

  • mock_inputs (Union[pandas.DataFrame, pyspark_dataframe.DataFrame]) - Keyword arguments with expected same keys as the FeatureView's inputs parameter. Each input name maps to a Spark DataFrame that should be evaluated for that node in the pipeline.

Returns

TectonDataFrame: A tecton.TectonDataFrame object.

transformation_schema()​

Returns the schema of the output of the transformation.

Returns

List[types.Field]

trigger_materialization_job(...)​

Starts a batch materialization job for this Feature View.

Parameters

  • start_time (datetime.datetime) - The job will materialize feature values between the start_time and end_time.

  • end_time (datetime.datetime) - The job will materialize feature values between the start_time and end_time.

  • online (bool) - Whether the job will materialize features to the online store. The Feature View must be configured with online=True in order to materialize features online.

  • offline (bool) - Whether the job will materialize features to the offline store. The Feature View must be configured with offline=True in order to materialize features offline.

  • use_tecton_managed_retries (bool) - If enabled, Tecton will automatically retry failed attempts. Disable to manage your own retry behavior. Default: true

  • overwrite (bool) - If enabled, you will be able to run materialization jobs for periods that previously have materialized data. This operation can be sensitive for feature views with existing materialized online data. For the offline store, all previously materialized data between the start time and end time will be dropped. For the online store, all previous data will remain, but may be overwritten by this job. Default: false

Returns

str: ID string of the created materialization job.

Raises

  • TectonValidationError: If job params are not valid.

wait_for_materialization_job()​

Blocks until the specified job has been completed.

Parameters

  • job_id (str) - ID string of the materialization job.

  • timeout (Optional[datetime.timedelta]) - (Optional) timeout for this function. An exception is raised if the job does not complete within the specified time. Default: None

Returns

MaterializationJob: JobData object for the successful job.

Raises

  • JobTimeoutException: If timeout param is specified and job does not complete within the specified time.
  • MaterializationJobFailedException: If materialization job did not reach a successful state.

with_join_key_map()​

Rebind join keys for a Feature View or Feature Table used in a Feature Service.
 
The keys in join_key_map should be the join keys, and the values should be the feature service overrides.

Parameters

  • join_key_map (Dict[str, str]) - Dictionary remapping the join key names. Dictionary keys are join keys, values are the feature service override values.

Returns

FeatureReference

Example

from tecton import FeatureService
# The join key for this feature service will be "feature_service_user_id".
feature_service = FeatureService(
name="feature_service",
features=[
my_feature_view.with_join_key_map({"user_id" : "feature_service_user_id"}),
],
)
# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
name="transaction_fraud_service",
features=[
# Select a subset of features from a feature view.
transaction_features[["amount"]],
# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
# transaction sender and recipient, so include the feature view twice and bind it to two different feature
# service join keys.
user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),
user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),
],
)

with_name()​

Rename a Feature View or Feature Table used in a Feature Service.

Parameters

  • namespace (str) - The namespace used to prefix the features joined from this FeatureView. By default, namespace is set to the FeatureView name.

Returns

FeatureReference

Examples

from tecton import FeatureService
# The feature view in this feature service will be named "new_named_feature_view" in training data dataframe
# columns and other metadata.
feature_service = FeatureService(
name="feature_service",
features=[
my_feature_view.with_name("new_named_feature_view")
],
)
# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
name="transaction_fraud_service",
features=[
# Select a subset of features from a feature view.
transaction_features[["amount"]],
# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
# transaction sender and recipient, so include the feature view twice and bind it to two different feature
# service join keys.
user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),
user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),
],
)

validate()​

Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary.

Returns

None

Was this page helpful?