tecton.interactive.StreamFeatureView

class tecton.interactive.StreamFeatureView(proto, fco_container)

StreamFeatureView class.

To get a FeatureView instance, call tecton.get_feature_view().

Methods

cancel_materialization_job

Cancels the scheduled or running batch materialization job for this Feature View specified by the job identifier.

delete_keys

Deletes any materialized data that matches the specified join keys from the FeatureView.

deletion_status

Displays information for deletion jobs created with the delete_keys() method, which may include past jobs, scheduled jobs, and job failures.

get_historical_features

Returns a Tecton TectonDataFrame of historical values for this feature view.

get_materialization_job

Retrieves data about the specified materialization job for this Feature View.

get_online_features

Returns a single Tecton tecton.FeatureVector from the Online Store.

list_materialization_jobs

Retrieves the list of all materialization jobs for this Feature View.

materialization_status

Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures.

run

Run the FeatureView.

run_stream

Starts a streaming job to keep writting the output records of this FeatureView to a temporary table.

summary

Returns various information about this feature definition, including the most critical metadata such as the name, owner, features, etc.

trigger_materialization_job

Starts a batch materialization job for this Feature View.

wait_for_materialization_job

Blocks until the specified job has been completed.

cancel_materialization_job(job_id)

Cancels the scheduled or running batch materialization job for this Feature View specified by the job identifier. Once cancelled, a job will not be retried further.

Job run state will be set to MANUAL_CANCELLATION_REQUESTED. Note that cancellation is asynchronous, so it may take some time for the cancellation to complete. If job run is already in MANUAL_CANCELLATION_REQUESTED or in a terminal state then it’ll return the job.

Parameters

job_id (str) – ID string of the materialization job.

Returns

MaterializationJobData object for the cancelled job.

delete_keys(keys, online=True, offline=True)

Deletes any materialized data that matches the specified join keys from the FeatureView. This method kicks off a job to delete the data in the offline and online stores. If a FeatureView has multiple entities, the full set of join keys must be specified. Only supports Delta offline store and Dynamo online store. (offline_store=DeltaConfig() and online_store left as default) Maximum 10000 keys can be deleted per request.

Parameters
  • keys (Union[DataFrame, DataFrame]) – The Dataframe to be deleted. Must conform to the FeatureView join keys.

  • online (bool) – (Optional, default=True) Whether or not to delete from the online store.

  • offline (bool) – (Optional, default=True) Whether or not to delete from the offline store.

Returns

None if deletion job was created successfully.

deletion_status(verbose=False, limit=1000, sort_columns=None, errors_only=False)

Displays information for deletion jobs created with the delete_keys() method, which may include past jobs, scheduled jobs, and job failures.

Parameters
  • verbose – If set to true, method will display additional low level deletion information, useful for debugging.

  • limit – Maximum number of jobs to return.

  • sort_columns – A comma-separated list of column names by which to sort the rows.

Param

errors_only: If set to true, method will only return jobs that failed with an error.

get_historical_features(spine=None, timestamp_key=None, start_time=None, end_time=None, entities=None, from_source=False, save=False, save_as=None)

Returns a Tecton TectonDataFrame of historical values for this feature view. If no arguments are passed in, all feature values for this feature view will be returned in a Tecton DataFrame.

Note: The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.

Parameters
  • spine (Union[pyspark.sql.DataFrame, pandas.DataFrame, TectonDataFrame]) – The spine to join against, as a dataframe. If present, the returned DataFrame will contain rollups for all (join key, temporal key) combinations that are required to compute a full frame from the spine. To distinguish between spine columns and feature columns, feature columns are labeled as feature_view_name.feature_name in the returned DataFrame. If spine is not specified, it’ll return a DataFrame of feature values in the specified time range.

  • timestamp_key (str) – Name of the time column in the spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of the spine if there is only one present. If more than one time column is present in the spine, you must specify which column you’d like to use.

  • start_time (datetime.datetime) – The interval start time from when we want to retrieve features. If no timezone is specified, will default to using UTC.

  • end_time (datetime.datetime) – The interval end time until when we want to retrieve features. If no timezone is specified, will default to using UTC.

  • entities (Union[pyspark.sql.DataFrame, pandas.DataFrame, TectonDataFrame]) – Filter feature data returned to a set of entity IDs. If specified, this DataFrame should only contain join key columns.

  • from_source (bool) – Whether feature values should be recomputed from the original data source. If False, we will read the materialized values from the offline store.

  • save (bool) – Whether to persist the DataFrame as a Dataset object. Default is False.

  • save_as (str) – Name to save the DataFrame as. If unspecified and save=True, a name will be generated.

Examples

A FeatureView fv with join key user_id.

1) fv.get_historical_features(spine) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine.

2) fv.get_historical_features(spine, save_as='my_dataset) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine. Save the DataFrame as dataset with the name :py:mod`my_dataset`.

3) fv.get_historical_features(spine, timestamp_key='date_1') where spine=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the ‘date_1’ column in the spine.

4) fv.get_historical_features(start_time=datetime(...), end_time=datetime(...)) Fetch all historical features from the offline store in the time range specified by start_time and end_time.

Returns

A Tecton TectonDataFrame.

get_materialization_job(job_id)

Retrieves data about the specified materialization job for this Feature View.

This data includes information about job attempts.

Parameters

job_id (str) – ID string of the materialization job.

Returns

MaterializationJobData object for the job.

get_online_features(join_keys, include_join_keys_in_response=False)

Returns a single Tecton tecton.FeatureVector from the Online Store.

Parameters
  • join_keys (Mapping[str, Union[int, int64, str, bytes]]) – Join keys of the enclosed FeatureViews.

  • include_join_keys_in_response (bool) – Whether to include join keys as part of the response FeatureVector.

Examples

A FeatureView fv with join key user_id.

1) fv.get_online_features(join_keys={'user_id': 1}) Fetch the latest features from the online store for user 1.

2) fv.get_online_features(join_keys={'user_id': 1}, include_join_keys_in_respone=True) Fetch the latest features from the online store for user 1 and include the join key information (user_id=1) in the returned FeatureVector.

Returns

A tecton.FeatureVector of the results.

list_materialization_jobs()

Retrieves the list of all materialization jobs for this Feature View.

Returns

List of MaterializationJobData objects.

materialization_status(verbose=False, limit=1000, sort_columns=None, errors_only=False)

Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures. This method returns different information depending on the type of FeatureView.

Parameters
  • verbose – If set to true, method will display additional low level materialization information, useful for debugging.

  • limit – Maximum number of jobs to return.

  • sort_columns – A comma-separated list of column names by which to sort the rows.

Param

errors_only: If set to true, method will only return jobs that failed with an error.

run(start_time=None, end_time=None, aggregation_level=None, **mock_sources)

Run the FeatureView. Supports transforming data directly from raw data sources or using mock data.

To run the feature view with data from raw data sources, the environment must have access to the data sources.

Parameters
  • start_time (Optional[datetime]) – The start time of the time window to materialize. If not set, defaults to end_time minus batch_schedule. If end_time is also not set, defaults to the start of the last complete materialization period.

  • end_time (Optional[datetime]) – The end time of the time window to materialize. If not set, defaults to start_time plus batch_schedule. If start_time is also not set, defaults to the end of the last complete materialization period.

  • aggregation_level (Optional[str]) –

    For feature views with aggregations, aggregation_level configures which stage of the aggregation to run up to.

    The query for Aggregate Feature Views operates in three steps:

    1. The feature view query is run over the provided time range. The user defined transformations are applied over the data source.

    2. The result of #1 is aggregated into tiles the size of the aggregation_interval.

    3. The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.

    For testing and debugging purposes, to see the output of #1, use aggregation_level="disabled". For #2, use aggregation_level="partial". For #3, use aggregation_level="full".

    aggregation_level="full" is the default behavior.

  • **mock_sources – kwargs for mock sources that should be used instead of fetching directly from raw data soruces. The keys should match the feature view’s function parameters. For feature views with multiple sources, mocking some data sources and using raw data for others is supported.

Returns

A tecton DataFrame of the results.

Example:

import tecton
import pandas
from datetime import datetime

# Example running a non-aggregate feature view with mock data.
feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_feature_view")

mock_fraud_user_data = pandas.DataFrame({
    "user_id": ["user_1", "user_2", "user_3"],
    "timestamp": [datetime(2022, 5, 1, 0), datetime(2022, 5, 1, 2), datetime(2022, 5, 1, 5)],
    "credit_card_number": [1000, 4000, 5000],
})

result = feature_view.run(
  start_time=datetime(2022, 5, 1),
  end_time=datetime(2022, 5, 2),
  fraud_users_batch=mock_fraud_user_data)  # `fraud_users_batch` is the name of this FeatureView's data source parameter.

# Example running an aggregate feature view with real data.
aggregate_feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_aggregate_feature_view")

result = aggregate_feature_view.run(
  start_time=datetime(2022, 5, 1),
  end_time=datetime(2022, 5, 2),
  aggregation_level="full")  # or "partial" or "disabled"
run_stream(output_temp_table)

Starts a streaming job to keep writting the output records of this FeatureView to a temporary table. The job will be running until the execution is terminated.

After records have been written to the table, they can be queried using spark.sql(). If ran in a Databricks notebook, Databricks will also automatically visualize the number of incoming records.

Parameters

output_temp_table (str) – The name of the temporary table to write to.

Example

  1. fv.run_stream(output_temp_table="temp_table") Start a streaming job.

  2. display(spark.sql("SELECT * FROM temp_table LIMIT 5")) Query the output table, and display the output dataframe.

summary()

Returns various information about this feature definition, including the most critical metadata such as the name, owner, features, etc.

trigger_materialization_job(start_time, end_time, online, offline, use_tecton_managed_retries=True, overwrite=False)

Starts a batch materialization job for this Feature View.

Parameters
  • start_time (datetime) – The job will materialize feature values between the start_time and end_time.

  • end_time (datetime) – The job will materialize feature values between the start_time and end_time.

  • online (bool) – Whether the job will materialize features to the online store. The Feature View must be configured with online=True in order to materialize features online.

  • offline (bool) – Whether the job will materialize features to the offline store. The Feature View must be configured with offline=True in order to materialize features offline.

  • use_tecton_managed_retries (bool) – If enabled, Tecton will automatically retry failed attempts. Disable to manage your own retry behavior.

  • overwrite (bool) – If enabled, you will be able to run materialization jobs for periods that previously have materialized data. This operation can be sensitive for feature views with existing materialized online data. For the offline store, all previously materialized data between the start time and end time will be dropped. For the online store, all previous data will remain, but may be overwritten by this job.

Returns

ID string of the created materialization job.

Raises

TectonValidationError – If job params are not valid.

wait_for_materialization_job(job_id, timeout=None)

Blocks until the specified job has been completed.

Parameters
  • job_id (str) – ID string of the materialization job.

  • timeout (Optional[timedelta]) – (Optional) timeout for this function. An exception is raised if the job does not complete within the specified time.

Returns

MaterializationJobData object for the successful job.

Raises
  • MaterializationTimeoutException – If timeout param is specified and job does not complete within the specified time.

  • MaterializationJobFailedException – If materialization job did not reach a successful state.

Attributes

batch_schedule

This represents how often we schedule batch materialization jobs.

created_at

Returns the creation date of this Tecton Object.

data_source_names

Returns the names of the data sources for this Feature View.

defined_in

Returns filename where this Tecton Object has been declared.

description

The description of this Tecton Object, set by user.

entity_names

Returns the names of entities for this Feature View.

family

Deprecated.

feature_start_time

This represents the time at which features are first available.

features

Returns the names of the (output) features.

id

Returns the id of this object

is_batch_trigger_manual

Whether batch materialization jobs must be explicitly initiated by the user.

is_on_demand

Deprecated.

is_temporal

Deprecated.

is_temporal_aggregate

Deprecated.

join_keys

Returns the join key column names

max_data_delay

The maximum data_delay for a data source input to this feature view.

name

The name of this Tecton Object.

online_serving_index

Returns Defines the set of join keys that will be indexed and queryable during online serving.

owner

The owner of this Tecton Object (typically the email of the primary maintainer.)

tags

Tags associated with this Tecton Object (key-value pairs of arbitrary metadata set by user.)

timestamp_field

Returns the timestamp_field of this FeatureView.

url

Returns a link to the Tecton Web UI.

wildcard_join_key

Returns a wildcard join key column name if it exists; Otherwise returns None.

workspace

Returns the workspace this Tecton Object was created in.