Skip to main content
Version: 1.0

RealtimeFeatureView

Summary​

A Tecton Realtime Feature View.
 
The RealtimeFeatureView should not be instantiated directly and the @realtime_feature_view decorator is recommended instead.

Examples​

Example Python mode​

An example declaration of a Realtime Feature View using Python mode. With Python mode, the function sources will be dictionaries, and the function is expected to return a dictionary matching the schema from feature_schema. Tecton recommends using Python mode for improved online serving performance.

from tecton import RequestSource, realtime_feature_view, Attribute
from tecton.types import Field, Float64, Int64

# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])

# Define the feature schema
feature_schema = [Attribute("transaction_amount_is_high", Int64)]

# This Realtime Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@realtime_feature_view(
sources=[transaction_request],
mode="python",
features=feature_schema,
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request):
result = {}
result["transaction_amount_is_high"] = int(transaction_request["amount"] >= 10000)
return result

Example Pandas mode​

An example declaration of a Realtime Feature View using Pandas mode. With Pandas mode, the function sources will be Pandas Dataframes, and the function is expected to return a Dataframe matching the schema from feature_schema.

from tecton import RequestSource, realtime_feature_view
from tecton.types import Field, Float64, Int64
import pandas

# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])

# Define the feature schema
feature_schema = [Attribute("transaction_amount_is_high", Int64)]

# This Realtime Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@realtime_feature_view(
sources=[transaction_request],
mode="pandas",
features=feature_schema,
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request):
import pandas as pd

df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df

@realtime_feature_view (Decorator)​

Declare a Realtime Feature View. With Python mode, the function sources will be dictionaries, and the function is expected to return a dictionary matching the schema from output_schema. Tecton recommends using Python mode for improved online serving performance.

Parameters

  • mode (str) - (Required) Whether the annotated function is a pipeline function ("pipeline" mode) or a transformation function ("python" or "pandas" mode). For the non-pipeline mode, an inferred transformation will also be registered.

  • sources (List[Union[configs.RequestSource, FeatureView, 'FeatureReference']]) - (Required) The data source inputs to the feature view. An input can be a RequestSource, a BatchFeatureView, or a StreamFeatureView

  • features (List[feature.Attribute]) - (Required) A list of features this feature view manages.

  • name (Optional[str]) - Unique, human friendly name that identifies the FeatureView. Defaults to the function name. Default: None

  • description (Optional[str]) - A human readable description. Default: None

  • owner (Optional[str]) - Owner name (typically the email of the primary maintainer). Default: None

  • tags (Optional[Dict[str, str]]) - Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). Default: None

  • prevent_destroy (bool) - If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object, prevent_destroy must be set to False via the same tecton apply or a separate tecton apply. prevent_destroy can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs. prevent_destroy also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. if prevent_destroy is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service. prevent_destroy is only enforced in live (i.e. non-dev) workspaces. Default: false

  • environments (Optional[List[str]]) - The environments in which this feature view can run. Defaults to None, which means the feature view can run in any environment. If specified, the feature view will only run in the specified environments. Learn more about environments at Realtime Feature View Environments. Default: None

  • context_parameter_name (Optional[str]) - Name of the function parameter that Tecton injects Realtime Context to. This context is a RealtimeContext object for Python mode FVs and a pandas.DataFrame object for Pandas mode FVs. Default: None

Returns

An object of type RealtimeFeatureView.

RealtimeFeatureView (class)​

Attributes​

NameData TypeDescription
context_parameter_nameOptional[str]Name of the function parameter that Tecton injects Realtime Context to.
created_atOptional[datetime.datetime]Returns the time that this Tecton object was created or last updated. None for locally defined objects.
defined_inOptional[str]The repo filename where this object was declared. None for locally defined objects.
descriptionOptional[str]Returns the description of the Tecton object.
environmentsTuple[str, ...]The environment in which this feature view runs.
feature_metadataList[FeatureMetadata]
idstrReturns the unique id of the Tecton object.
info
join_keysList[str]The join key column names.
namestrReturns the name of the Tecton object.
online_serving_indexList[str]The set of join keys that will be indexed and queryable during online serving.
 
defaults to the complete set of join keys.
ownerOptional[str]Returns the owner of the Tecton object.
prevent_destroyboolIf set to True, Tecton will block destructive actions taken on this Feature View or Feature Table.
sources
tagsDict[str, str]Returns the tags of the Tecton object.
transformations
urlstrReturns a link to the Tecton Web UI.
wildcard_join_keyOptional[set]Returns a wildcard join key column name if it exists; Otherwise returns None.
workspaceOptional[str]Returns the workspace that this Tecton object belongs to. None for locally defined objects.

Methods​

NameDescription
__init__(...)Initialize RealtimeFeatureView
cancel_materialization_job(...)Cancels the scheduled or running job by the job identifier.
get_feature_columns()Retrieves the list of feature columns produced by this FeatureView.
get_features_for_events(...)Returns a TectonDataFrame of historical values for this feature view.
get_historical_features(...)Returns a TectonDataFrame of historical values for this feature view.
get_job(...)Retrieves data about the specified job (materialization or dataset generation).
get_materialization_job(...)Retrieves data about the specified materialization job.
get_online_features(...)Returns a single Tecton tecton.FeatureVector from the Online Store.
list_jobs()Retrieves the list of all jobs (materialization and dataset generation) for this Feature View or Feature Table.
list_materialization_jobs()Retrieves the list of all materialization jobs for this feature view.
print_transformation_schema()Prints the schema of the output of the transformation.
run(...)Run the RealtimeFeatureView using mock inputs.
run_transformation(...)Run the RealtimeFeatureView using mock inputs.
summary()Displays a human-readable summary.
test_run(...)Run the RealtimeFeatureView using mock sources.
transformation_schema()Returns the schema of the output of the transformation.
validate()Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary.
with_join_key_map(...)Rebind join keys for a Feature View or Feature Table used in a Feature Service.
with_name(...)Rename a Feature View or Feature Table used in a Feature Service.

cancel_materialization_job(...)​

Cancels the scheduled or running job by the job identifier. Once cancelled, a job will not be retried further.
 
Job run state will be set to MANUAL_CANCELLATION_REQUESTED. Note that cancellation is asynchronous, so it may take some time for the cancellation to complete. If job run is already in MANUAL_CANCELLATION_REQUESTED or in a terminal state then it'll return the job.

Parameters

  • job_id (str) - ID string of the materialization job.

Returns

MaterializationJob: JobData object for the cancelled job.

get_feature_columns(...)​

Retrieves the list of feature columns produced by this FeatureView.

Returns

List[str]: The features produced by this FeatureView.

get_features_for_events(...)​

info

This method is functionally equivalent to get_historical_features(spine) and has been renamed in Tecton 0.8 for clarity. get_historical_features() is planned to be deprecated in a future release.

Returns a TectonDataFrame of historical values for this feature view.
 
By default (i.e. from_source=None), this method fetches feature values from the Offline Store for input Feature Views that have offline materialization enabled and otherwise computes input feature values on the fly from raw data.

Parameters

  • events (Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame, str]) - A dataframe of possible join keys, request data keys, and timestamps that specify which feature values to fetch. The returned data frame will contain rollups for all (join key, request data key) combinations that are required to compute a full frame from the events dataframe.

  • timestamp_key (Optional[str]) - Name of the time column in spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified and this feature view has feature view dependencies, timestamp_key will default to the time column of the spine if there is only one present. Default: None

  • from_source (Optional[bool]) - Whether feature values should be recomputed from the original data source. If None, input feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any input Feature Views are not materialized. Defaults to None. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Default: None

Returns

TectonDataFrame: A TectonDataFrame.

Examples​

An RealtimeFeatureView fv that expects request time data for the key amount:

The request time data is defined in the feature definition as such:

request_schema = StructType()
request_schema.add(StructField("amount", DoubleType()))
transaction_request = RequestDataSource(request_schema=request_schema)
  1. fv.get_features_for_events(events) where events=pandas.Dataframe({'amount': [30, 50, 10000]}) Fetch historical features from the offline store with request time data inputs 30, 50, and 10000 for key ‘amount’.

  2. fv.get_features_for_events(events, save_as='my_dataset') where events=pandas.Dataframe({'amount': [30, 50, 10000]}) Fetch historical features from the offline store request time data inputs 30, 50, and 10000 for key ‘amount’. Save the DataFrame as dataset with the name ‘my_dataset’.

A RealtimeFeatureView fv that expects request time data for the key amount and has a feature view dependency with join key user_id:

  1. fv.get_features_for_events(events) where events=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'amount': [30, 50, 10000]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps and values for amount in the events dataframe.

get_historical_features(...)​

info

The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.

info

The ability to run get_historical_features as part of a unit test was added in SDK 0.7. To utilize this, provide the mocked data sources in the mock_inputs parameter in a test that is run via tecton test or pytest.

Returns a TectonDataFrame of historical values for this feature view.
 
By default (i.e. from_source=None), this method fetches feature values from the Offline Store for input Feature Views that have offline materialization enabled and otherwise computes input feature values on the fly from raw data.

Parameters

  • spine (Union[pyspark_dataframe.DataFrame, pandas.DataFrame, TectonDataFrame, str]) - The spine to join against, as a dataframe. The returned data frame will contain rollups for all (join key, request data key) combinations that are required to compute a full frame from the spine.

  • timestamp_key (Optional[str]) - Name of the time column in spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified and this feature view has feature view dependencies, timestamp_key will default to the time column of the spine if there is only one present. Default: None

  • from_source (Optional[bool]) - Whether feature values should be recomputed from the original data source. If None, input feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any input Feature Views are not materialized. Defaults to None. Default: None

  • compute_mode (Optional[Union[ComputeMode, str]]) - Compute mode to use to produce the data frame. Default: None

Returns

TectonDataFrame: A TectonDataFrame.

Examples​

An RealtimeFeatureView fv that expects request time data for the key amount:

The request time data is defined in the feature definition as such:

request_schema = StructType()
request_schema.add(StructField("amount", DoubleType()))
transaction_request = RequestDataSource(request_schema=request_schema)
  1. fv.get_historical_features(spine) where spine=pandas.Dataframe({'amount': [30, 50, 10000]}) Fetch historical features from the offline store with request time data inputs 30, 50, and 10000 for key ‘amount’.

  2. fv.get_historical_features(spine, save_as='my_dataset') where spine=pandas.Dataframe({'amount': [30, 50, 10000]}) Fetch historical features from the offline store request time data inputs 30, 50, and 10000 for key ‘amount’. Save the DataFrame as dataset with the name ‘my_dataset’.

A RealtimeFeatureView fv that expects request time data for the key amount and has a feature view dependency with join key user_id:

  1. fv.get_historical_features(spine) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'amount': [30, 50, 10000]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps and values for amount in the spine.

get_materialization_job(...)​

Retrieves data about the specified materialization job.

Parameters

  • job_id (str) - ID string of the materialization job.

Returns

MaterializationJob: MaterializationJobData object for the job.

get_online_features(...)​

Returns a single Tecton tecton.FeatureVector from the Online Store.

Parameters

  • join_keys (Optional[Mapping[str, Union[int, numpy.int_, str, bytes]]]) - Join keys of the enclosed FeatureViews. Default: None

  • include_join_keys_in_response (bool) - Whether to include join keys as part of the response FeatureVector. Default: false

  • request_data (Optional[Mapping[str, Union[int, numpy.int_, str, bytes, float]]]) - Dictionary of request context values used for RealtimeFeatureViews. Default: None

Returns

FeatureVector: A tecton.FeatureVector of the results.

Examples​

An RealtimeFeatureVIew fv that expects request time data for the key amount.

The request time data is defined in the feature definition as such:

request_schema = StructType()
request_schema.add(StructField("amount", DoubleType()))
transaction_request = RequestDataSource(request_schema=request_schema)
  1. fv.get_online_features(request_data={'amount': 50}) Fetch the latest features with input amount=50.

An RealtimeFeatureView fv that has a feature view dependency with join key user_id and expects request time data for the key amount.

  1. fv.get_online_features(join_keys={'user_id': 1}, request_data={'amount': 50}, include_join_keys_in_response=True)

Fetch the latest features from the online store for user 1 with input amount=50. In the returned FeatureVector, include the join key information (user_id=1).

list_materialization_jobs()​

Retrieves the list of all materialization jobs for this feature view.

Returns

List[MaterializationJob]: List of MaterializationJobData objects.
Prints the schema of the output of the transformation.

Returns

None

run(...)​

info

This method has been replaced by the .run_transformation() method and will be deprecated in a future release.

Run the RealtimeFeatureView using mock inputs.

Parameters

  • mock_inputs (Union[Dict[str, Any], pandas.DataFrame, pyspark_dataframe.DataFrame]) - Required. Keyword args with the same expected keys as the RealtimeFeatureView's inputs parameters. For the "python" mode, each input must be a Dictionary representing a single row. For the "pandas" mode, each input must be a DataFrame with all of them containing the same number of rows and matching row ordering.

Returns

Union[Dict[str, Any], TectonDataFrame]: A Dict object for the "python" mode and a tecton DataFrame of the results for the "pandas" mode.

Examples

# Given a Python Realtime Feature View defined in your workspace:
@realtime_feature_view(
sources=[transaction_request, user_transaction_amount_metrics],
mode="python",
features=features,
description="The transaction amount is higher than the 1 day average.",
)
def transaction_amount_is_higher_than_average(request, user_metrics):
return {"higher_than_average": request["amt"] > user_metrics["daily_average"]}
# Retrieve and run the Feature View in a notebook using mock data:
import tecton
fv = tecton.get_workspace("prod").get_feature_view("transaction_amount_is_higher_than_average")
result = fv.run(request={"amt": 100}, user_metrics={"daily_average": 1000})
print(result)
# {'higher_than_average': False}

run_transformation(...)​

Run the RealtimeFeatureView using mock inputs.

Parameters

  • input_data (Dict[str, Any]) - Required. Dict with the same expected keys as the RealtimeFeatureView's inputs parameters. For the "python" mode, each value must be a Dictionary representing a single row. For the "pandas" mode, each value must be a DataFrame with all of them containing the same number of rows and matching row ordering.

Returns

Union[Dict[str, Any], TectonDataFrame]: A Dict object for the "python" mode and a tecton DataFrame of the results for the "pandas" mode.

Examples

# Given a Python Realtime Feature View defined in your workspace:
@realtime_feature_view(
sources=[transaction_request, user_transaction_amount_metrics],
mode="python",
features=features,
description="The transaction amount is higher than the 1 day average.",
)
def transaction_amount_is_higher_than_average(request, user_metrics):
return {"higher_than_average": request["amt"] > user_metrics["daily_average"]}
# Retrieve and run the Feature View in a notebook using mock data:
import tecton
fv = tecton.get_workspace("prod").get_feature_view("transaction_amount_is_higher_than_average")
input_data = {"request": {"amt": 100}, "user_metrics": {"daily_average": 1000}}
result = fv.run_transformation(input_data=input_data)
print(result)
# {'higher_than_average': False}

summary()​

Displays a human-readable summary.

test_run(...)​

Run the RealtimeFeatureView using mock sources.
 
Unlike run, test_run is intended for unit testing. It will not make calls to your connected Tecton cluster to validate the RealtimeFeatureView.

Parameters

  • mock_inputs (Union[Dict[str, Any], pandas.DataFrame]) - Required. Keyword args with the same expected keys as the RealtimeFeatureView's inputs parameters. For the "python" mode, each input must be a Dictionary representing a single row. For the "pandas" mode, each input must be a DataFrame with all of them containing the same number of rows and matching row ordering.

Returns

Union[Dict[str, Any], pandas.DataFrame]: A Dict object for the "python" mode and a pandas.DataFrame object for the "pandas" mode".

Example

from datetime import datetime, timedelta
import pandas
from fraud.features.batch_features.user_credit_card_issuer import user_credit_card_issuer
# The `tecton_pytest_spark_session` is a PyTest fixture that provides a
# Tecton-defined PySpark session for testing Spark transformations and feature
# views.
def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame({
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
})
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)
# Simulate materializing features for May 1st.
output = user_credit_card_issuer.test_run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
fraud_users_batch=input_spark_df)
actual = output.to_pandas()
expected = pandas.DataFrame({
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"],
})
pandas.testing.assert_frame_equal(actual, expected)

transformation_schema()​

Returns the schema of the output of the transformation.

Returns

List[types.Field]

with_join_key_map()​

Rebind join keys for a Feature View or Feature Table used in a Feature Service.
 
The keys in join_key_map should be the join keys, and the values should be the feature service overrides.

Parameters

  • join_key_map (Dict[str, str]) - Dictionary remapping the join key names. Dictionary keys are join keys, values are the feature service override values.

Returns

FeatureReference

Example

from tecton import FeatureService
# The join key for this feature service will be "feature_service_user_id".
feature_service = FeatureService(
name="feature_service",
features=[
my_feature_view.with_join_key_map({"user_id" : "feature_service_user_id"}),
],
)
# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
name="transaction_fraud_service",
features=[
# Select a subset of features from a feature view.
transaction_features[["amount"]],
# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
# transaction sender and recipient, so include the feature view twice and bind it to two different feature
# service join keys.
user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),
user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),
],
)

with_name()​

Rename a Feature View or Feature Table used in a Feature Service.

Parameters

  • namespace (str) - The namespace used to prefix the features joined from this FeatureView. By default, namespace is set to the FeatureView name.

Returns

FeatureReference

Examples

from tecton import FeatureService
# The feature view in this feature service will be named "new_named_feature_view" in training data dataframe
# columns and other metadata.
feature_service = FeatureService(
name="feature_service",
features=[
my_feature_view.with_name("new_named_feature_view")
],
)
# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
name="transaction_fraud_service",
features=[
# Select a subset of features from a feature view.
transaction_features[["amount"]],
# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
# transaction sender and recipient, so include the feature view twice and bind it to two different feature
# service join keys.
user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),
user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),
],
)

validate()​

Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary.

Returns

None

Was this page helpful?