RealtimeFeatureView
A RealtimeFeatureView is a Feature View that executes transformations at request time, operating on data from Request Sources and Batch or Stream Feature Views.
Feature Types​
Realtime Feature Views support the following feature types:
Calculation​
Defines a SQL-like expression that is efficiently executed directly in the Feature Server and avoids the overhead of running a Python or Pandas transformation.
Calculation(
name: str,
expr: str,
description: Optional[str],
tags: Optional[Dict[str, str]],
)
Parameters​
- name (str): Feature name
- expr (str): SQL-like expression defining the calculation
- description (str): A human-readable description of the feature.
- tags (Optional[Dict[str, str]]): Tags associated with the feature (key-value pairs of user-defined metadata).
Attribute​
Used to project an input field directly to an output feature.
Attribute(
name: str,
dtype: DataType,
description: Optional[str],
input_column: Optional[Field],
)
Parameters​
- name (str): Feature name
- dtype (DataType): Feature data type
- description (str): A human-readable description of the feature.
- tags (Optional[Dict[str, str]]): Tags associated with the feature (key-value pairs of user-defined metadata).
Examples​
Realtime Feature Views with Calculation Features​
When using a Calculation
feature, simply define the expression using one of the supported
functions in the expr
parameter.
transaction_analysis = RealtimeFeatureView(
name="transaction_analysis",
sources=[request_ds, user_metrics],
features=[
Calculation(
name="amount_normalized",
expr="COALESCE(user_metrics.amount_mean, 0) / 100",
)
]
)
Realtime Feature Views with Python Mode Transformations​
With Python mode, the function inputs will be dictionaries, and the function is expected to
return a dictionary matching the schema specified by the features
parameter. Tecton recommends
using Python mode for improved online serving performance.
from tecton import RequestSource, realtime_feature_view, Attribute
from tecton.types import Field, Float64, Int64
# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])
# Define the feature schema
feature_schema = [Attribute("transaction_amount_is_high", Int64)]
# This Realtime Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@realtime_feature_view(
sources=[transaction_request],
mode="python",
features=feature_schema,
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request):
result = {}
result["transaction_amount_is_high"] = int(transaction_request["amount"] >= 10000)
return result
Realtime Feature Views with Pandas Mode Transformations​
With Pandas mode, the function inputs will be Pandas Dataframes, and the function is
expected to return a Pandas Dataframe matching the schema specified by the
features
parameter. Pandas mode is recommended if you'd like to optimize for
efficient offline feature retrieval workloads.
from tecton import RequestSource, realtime_feature_view
from tecton.types import Field, Float64, Int64
import pandas
# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])
# Define the feature schema
feature_schema = [Attribute("transaction_amount_is_high", Int64)]
# This Realtime Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@realtime_feature_view(
sources=[transaction_request],
mode="pandas",
features=feature_schema,
description="Whether the transaction amount is considered high (over $10000)",
)
def transaction_amount_is_high(transaction_request):
import pandas as pd
df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df
@realtime_feature_view (Decorator)​
Declare a Realtime Feature View. With Python mode, the function sources will be dictionaries, and the function is expected to return a dictionary matching the schema fromfeatures
.
Tecton recommends using Python mode for improved online serving performance.Parameters
mode
(str
) - (Required) Whether the annotated function is a pipeline function ("pipeline" mode) or a transformation function ("python" or "pandas" mode). For the non-pipeline mode, an inferred transformation will also be registered.sources
(List
[Union
[configs.RequestSource
,FeatureView
,'FeatureReference'
]]) - (Required) The data source inputs to the feature view. An input can be a RequestSource, a BatchFeatureView, or a StreamFeatureViewfeatures
(List
[feature.Attribute
]) - (Required) A list of features this feature view manages.name
(Optional
[str
]) - Unique, human friendly name that identifies the FeatureView. Defaults to the function name. Default:None
description
(Optional
[str
]) - A human readable description. Default:None
owner
(Optional
[str
]) - Owner name (typically the email of the primary maintainer). Default:None
tags
(Optional
[Dict
[str
,str
]]) - Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). Default:None
prevent_destroy
(bool
) - If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object,prevent_destroy
must be set to False via the same tecton apply or a separate tecton apply.prevent_destroy
can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs.prevent_destroy
also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. ifprevent_destroy
is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service.prevent_destroy
is only enforced in live (i.e. non-dev) workspaces. Default:false
environments
(Optional
[List
[str
]]) - The environments in which this feature view can run. Defaults toNone
, which means the feature view can run in any environment. If specified, the feature view will only run in the specified environments. Learn more about environments at Realtime Feature View Environments. Default:None
context_parameter_name
(Optional
[str
]) - Name of the function parameter that Tecton injects Realtime Context to. This context is a RealtimeContext object for Python mode FVs and a pandas.DataFrame object for Pandas mode FVs. Default:None
secrets
(Optional
[Dict
[str
,Union
[Secret
,str
]]]) - A dictionary of Secret references that will be resolved and provided to the Realtime transformation at runtime. During local development and testing, strings may be used instead Secret references. Default:None
resource_providers
(Optional
[Dict
[str
,resource_provider.ResourceProvider
]]) - A dictionary of Resource providers that will be evaluated and resources will be provided to transformation function at runtime. Default:None
Returns
An object of typeRealtimeFeatureView
.
RealtimeFeatureView (class)​
Attributes​
Name | Data Type | Description |
---|---|---|
context_parameter_name | Optional[str] | Name of the function parameter that Tecton injects Realtime Context to. |
created_at | Optional[datetime.datetime] | Returns the time that this Tecton object was created or last updated. None for locally defined objects. |
defined_in | Optional[str] | The repo filename where this object was declared. None for locally defined objects. |
description | Optional[str] | Returns the description of the Tecton object. |
environments | Tuple[str, ...] | The environment in which this feature view runs. |
feature_metadata | List[FeatureMetadata] | |
id | str | Returns the unique id of the Tecton object. |
info | ||
join_keys | List[str] | The join key column names. |
name | str | Returns the name of the Tecton object. |
online_serving_index | List[str] | The set of join keys that will be indexed and queryable during online serving. Â defaults to the complete set of join keys. |
owner | Optional[str] | Returns the owner of the Tecton object. |
prevent_destroy | bool | If set to True, Tecton will block destructive actions taken on this Feature View or Feature Table. |
resource_providers | ||
sources | ||
tags | Dict[str, str] | Returns the tags of the Tecton object. |
transformations | ||
url | str | Returns a link to the Tecton Web UI. |
wildcard_join_key | Optional[set] | Returns a wildcard join key column name if it exists; Otherwise returns None. |
workspace | Optional[str] | Returns the workspace that this Tecton object belongs to. None for locally defined objects. |
Methods​
Name | Description |
---|---|
__init__(...) | Initialize RealtimeFeatureView |
cancel_materialization_job(...) | Cancels the scheduled or running job by the job identifier. |
get_feature_columns() | Retrieves the list of feature columns produced by this FeatureView. |
get_features_for_events(...) | Returns a TectonDataFrame of historical values for this feature view. |
get_historical_features(...) | Returns a TectonDataFrame of historical values for this feature view. |
get_job(...) | Retrieves data about the specified job (materialization or dataset generation). |
get_materialization_job(...) | Retrieves data about the specified materialization job. |
get_online_features(...) | Returns a single Tecton tecton.FeatureVector from the Online Store. |
list_jobs() | Retrieves the list of all jobs (materialization and dataset generation) for this Feature View or Feature Table. |
list_materialization_jobs() | Retrieves the list of all materialization jobs for this feature view. |
print_transformation_schema() | Prints the schema of the output of the transformation. |
run(...) | Run the RealtimeFeatureView using mock inputs. |
run_transformation(...) | Run the RealtimeFeatureView using mock inputs. |
summary() | Displays a human-readable summary. |
test_run(...) | Run the RealtimeFeatureView using mock sources. |
transformation_schema() | Returns the schema of the output of the transformation. |
validate() | Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary. |
with_join_key_map(...) | Rebind join keys for a Feature View or Feature Table used in a Feature Service. |
with_name(...) | Rename a Feature View or Feature Table used in a Feature Service. |
cancel_materialization_job(...)​
Cancels the scheduled or running job by the job identifier. Once cancelled, a job will not be retried further.Â
Job run state will be set to
MANUAL_CANCELLATION_REQUESTED
.
Note that cancellation is asynchronous, so it may take some time for the cancellation to complete.
If job run is already in MANUAL_CANCELLATION_REQUESTED
or in a terminal state then it'll return the job.Parameters
job_id
(str
) - ID string of the materialization job.
Returns
MaterializationJob
: JobData
object for the cancelled job.
get_feature_columns(...)​
Retrieves the list of feature columns produced by this FeatureView.Returns
List
[str
]: The features produced by this FeatureView.
get_features_for_events(...)​
This method is functionally equivalent to get_historical_features(spine)
and
has been renamed in Tecton 0.8 for clarity. get_historical_features()
is
planned to be deprecated in a future release.
TectonDataFrame
of historical values for this feature view.Â
By default (i.e.
from_source=None
), this method fetches feature values from the Offline Store for input
Feature Views that have offline materialization enabled and otherwise computes input feature values on the fly
from raw data.Parameters
events
(Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
,str
]) - A dataframe of possible join keys, request data keys, and timestamps that specify which feature values to fetch. The returned data frame will contain rollups for all (join key, request data key) combinations that are required to compute a full frame from theevents
dataframe.timestamp_key
(Optional
[str
]) - Name of the time column in spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified and this feature view has feature view dependencies,timestamp_key
will default to the time column of the spine if there is only one present. Default:None
from_source
(Optional
[bool
]) - Whether feature values should be recomputed from the original data source. IfNone
, input feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Usefrom_source=True
to force computing from raw data andfrom_source=False
to error if any input Feature Views are not materialized. Defaults to None. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Default:None
Returns
TectonDataFrame
: A TectonDataFrame
.
Examples​
An RealtimeFeatureView fv
that expects request time data for the key amount
:
The request time data is defined in the feature definition as such:
request_schema = StructType()
request_schema.add(StructField("amount", DoubleType()))
transaction_request = RequestDataSource(request_schema=request_schema)
-
fv.get_features_for_events(events)
whereevents=pandas.Dataframe({'amount': [30, 50, 10000]})
Fetch historical features from the offline store with request time data inputs 30, 50, and 10000 for key ‘amount’. -
fv.get_features_for_events(events, save_as='my_dataset')
whereevents=pandas.Dataframe({'amount': [30, 50, 10000]})
Fetch historical features from the offline store request time data inputs 30, 50, and 10000 for key ‘amount’. Save the DataFrame as dataset with the name ‘my_dataset’.
A RealtimeFeatureView fv
that expects request time data for the key amount
and has a feature view dependency with join key user_id
:
fv.get_features_for_events(events)
whereevents=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'amount': [30, 50, 10000]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps and values for amount in theevents
dataframe.
get_historical_features(...)​
The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.
The ability to run get_historical_features
as part of a unit test was added in
SDK 0.7. To utilize this, provide the mocked data sources in the mock_inputs
parameter in a test that is run via tecton test
or pytest
.
TectonDataFrame
of historical values for this feature view.Â
By default (i.e.
from_source=None
), this method fetches feature values from the Offline Store for input
Feature Views that have offline materialization enabled and otherwise computes input feature values on the fly
from raw data.Parameters
spine
(Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
,str
]) - The spine to join against, as a dataframe. The returned data frame will contain rollups for all (join key, request data key) combinations that are required to compute a full frame from the spine.timestamp_key
(Optional
[str
]) - Name of the time column in spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified and this feature view has feature view dependencies,timestamp_key
will default to the time column of the spine if there is only one present. Default:None
from_source
(Optional
[bool
]) - Whether feature values should be recomputed from the original data source. IfNone
, input feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Usefrom_source=True
to force computing from raw data andfrom_source=False
to error if any input Feature Views are not materialized. Defaults to None. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Default:None
Returns
TectonDataFrame
: A TectonDataFrame
.
Examples​
An RealtimeFeatureView fv
that expects request time data for the key amount
:
The request time data is defined in the feature definition as such:
request_schema = StructType()
request_schema.add(StructField("amount", DoubleType()))
transaction_request = RequestDataSource(request_schema=request_schema)
-
fv.get_historical_features(spine)
wherespine=pandas.Dataframe({'amount': [30, 50, 10000]})
Fetch historical features from the offline store with request time data inputs 30, 50, and 10000 for key ‘amount’. -
fv.get_historical_features(spine, save_as='my_dataset')
wherespine=pandas.Dataframe({'amount': [30, 50, 10000]})
Fetch historical features from the offline store request time data inputs 30, 50, and 10000 for key ‘amount’. Save the DataFrame as dataset with the name ‘my_dataset’.
A RealtimeFeatureView fv
that expects request time data for the key amount
and has a feature view dependency with join key user_id
:
fv.get_historical_features(spine)
wherespine=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'amount': [30, 50, 10000]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps and values for amount in the spine.
get_materialization_job(...)​
Retrieves data about the specified materialization job.Parameters
job_id
(str
) - ID string of the materialization job.
Returns
MaterializationJob
: MaterializationJobData
object for the job.
get_online_features(...)​
Returns a single Tectontecton.FeatureVector
from the Online Store.Parameters
join_keys
(Optional
[Mapping
[str
,Union
[int
,numpy.int_
,str
,bytes
]]]) - Join keys of the enclosed FeatureViews. Default:None
include_join_keys_in_response
(bool
) - Whether to include join keys as part of the response FeatureVector. Default:false
request_data
(Optional
[Mapping
[str
,Union
[int
,numpy.int_
,str
,bytes
,float
]]]) - Dictionary of request context values used for RealtimeFeatureViews. Default:None
Returns
FeatureVector
: A tecton.FeatureVector
of the results.
Examples​
An RealtimeFeatureVIew
fv
that expects request time data for the key amount
.
The request time data is defined in the feature definition as such:
request_schema = StructType()
request_schema.add(StructField("amount", DoubleType()))
transaction_request = RequestDataSource(request_schema=request_schema)
fv.get_online_features(request_data={'amount': 50})
Fetch the latest features with input amount=50.
An RealtimeFeatureView fv
that has a feature view dependency with join key
user_id
and expects request time data for the key amount
.
fv.get_online_features(join_keys={'user_id': 1}, request_data={'amount': 50}, include_join_keys_in_response=True)
Fetch the latest features from the online store for user 1 with input amount=50.
In the returned FeatureVector, include the join key information (user_id=1
).
list_materialization_jobs()​
Retrieves the list of all materialization jobs for this feature view.Returns
List
[MaterializationJob
]: List of MaterializationJobData
objects.
print_transformation_schema()​
Prints the schema of the output of the transformation.Returns
None
run(...)​
This method has been replaced by the .run_transformation()
method and will be
deprecated in a future release.
Parameters
mock_inputs
(Union
[Dict
[str
,Any
],pandas.DataFrame
,pyspark_dataframe.DataFrame
]) - Required. Keyword args with the same expected keys as the RealtimeFeatureView's inputs parameters. For the "python" mode, each input must be a Dictionary representing a single row. For the "pandas" mode, each input must be a DataFrame with all of them containing the same number of rows and matching row ordering.
Returns
Union
[Dict
[str
, Any
], TectonDataFrame
]: A Dict
object for the "python" mode and a tecton DataFrame of the results for the "pandas" mode.Examples
# Given a Python Realtime Feature View defined in your workspace:@realtime_feature_view(sources=[transaction_request, user_transaction_amount_metrics],mode="python",features=features,description="The transaction amount is higher than the 1 day average.",)def transaction_amount_is_higher_than_average(request, user_metrics):return {"higher_than_average": request["amt"] > user_metrics["daily_average"]}
# Retrieve and run the Feature View in a notebook using mock data:import tectonfv = tecton.get_workspace("prod").get_feature_view("transaction_amount_is_higher_than_average")result = fv.run(request={"amt": 100}, user_metrics={"daily_average": 1000})print(result)# {'higher_than_average': False}
run_transformation(...)​
Run the RealtimeFeatureView using mock inputs.Parameters
input_data
(Dict
[str
,Any
]) - Required. Dict with the same expected keys as the RealtimeFeatureView's inputs parameters. For the "python" mode, each value must be a Dictionary representing a single row. For the "pandas" mode, each value must be a DataFrame with all of them containing the same number of rows and matching row ordering.
Returns
Union
[Dict
[str
, Any
], TectonDataFrame
]: A Dict
object for the "python" mode and a tecton DataFrame of the results for the "pandas" mode.Examples
# Given a Python Realtime Feature View defined in your workspace:@realtime_feature_view(sources=[transaction_request, user_transaction_amount_metrics],mode="python",features=features,description="The transaction amount is higher than the 1 day average.",)def transaction_amount_is_higher_than_average(request, user_metrics):return {"higher_than_average": request["amt"] > user_metrics["daily_average"]}
# Retrieve and run the Feature View in a notebook using mock data:import tectonfv = tecton.get_workspace("prod").get_feature_view("transaction_amount_is_higher_than_average")input_data = {"request": {"amt": 100}, "user_metrics": {"daily_average": 1000}}result = fv.run_transformation(input_data=input_data)print(result)# {'higher_than_average': False}
summary()​
Displays a human-readable summary.test_run(...)​
Run the RealtimeFeatureView using mock sources.Â
Unlike
run
, test_run
is intended for unit testing. It will not make calls to your
connected Tecton cluster to validate the RealtimeFeatureView.Parameters
mock_inputs
(Union
[Dict
[str
,Any
],pandas.DataFrame
]) - Required. Keyword args with the same expected keys as the RealtimeFeatureView's inputs parameters. For the "python" mode, each input must be a Dictionary representing a single row. For the "pandas" mode, each input must be a DataFrame with all of them containing the same number of rows and matching row ordering.
Returns
Union
[Dict
[str
, Any
], pandas.DataFrame
]: A Dict
object for the "python" mode and a pandas.DataFrame
object for the "pandas" mode".Example
from datetime import datetime, timedeltaimport pandasfrom fraud.features.batch_features.user_credit_card_issuer import user_credit_card_issuer# The `tecton_pytest_spark_session` is a PyTest fixture that provides a# Tecton-defined PySpark session for testing Spark transformations and feature# views.def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_session):input_pandas_df = pandas.DataFrame({"user_id": ["user_1", "user_2", "user_3", "user_4"],"signup_timestamp": [datetime(2022, 5, 1)] * 4,"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],})input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)# Simulate materializing features for May 1st.output = user_credit_card_issuer.test_run(start_time=datetime(2022, 5, 1),end_time=datetime(2022, 5, 2),fraud_users_batch=input_spark_df)actual = output.to_pandas()expected = pandas.DataFrame({"user_id": ["user_1", "user_2", "user_3", "user_4"],"signup_timestamp": [datetime(2022, 5, 1)] * 4,"credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"],})pandas.testing.assert_frame_equal(actual, expected)
transformation_schema()​
Returns the schema of the output of the transformation.Returns
List
[types.Field
]
with_join_key_map()​
Rebind join keys for a Feature View or Feature Table used in a Feature Service.Â
The keys in join_key_map should be the join keys, and the values should be the feature service overrides.
Parameters
join_key_map
(Dict
[str
,str
]) - Dictionary remapping the join key names. Dictionary keys are join keys, values are the feature service override values.
Returns
FeatureReference
Example
from tecton import FeatureService# The join key for this feature service will be "feature_service_user_id".feature_service = FeatureService(name="feature_service",features=[my_feature_view.with_join_key_map({"user_id" : "feature_service_user_id"}),],)# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",# "sender_features", and "recipient_features".transaction_fraud_service = FeatureService(name="transaction_fraud_service",features=[# Select a subset of features from a feature view.transaction_features[["amount"]],# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the# transaction sender and recipient, so include the feature view twice and bind it to two different feature# service join keys.user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),],)
with_name()​
Rename a Feature View or Feature Table used in a Feature Service.Parameters
namespace
(str
) - The namespace used to prefix the features joined from this FeatureView. By default, namespace is set to the FeatureView name.
Returns
FeatureReference
Examples
from tecton import FeatureService# The feature view in this feature service will be named "new_named_feature_view" in training data dataframe# columns and other metadata.feature_service = FeatureService(name="feature_service",features=[my_feature_view.with_name("new_named_feature_view")],)
# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",# "sender_features", and "recipient_features".transaction_fraud_service = FeatureService(name="transaction_fraud_service",features=[# Select a subset of features from a feature view.transaction_features[["amount"]],# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the# transaction sender and recipient, so include the feature view twice and bind it to two different feature# service join keys.user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),],)
validate()​
Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary.Returns
None