Skip to main content
Version: 0.9

StreamFeatureView

Summary​

A Tecton Stream Feature View, used for transforming and materializing features from a StreamSource.

The StreamFeatureView should not be instantiated directly and the stream_feature_view() decorator is recommended instead.

Attributes​

NameData TypeDescription
aggregationsOptional[configs.Aggregation]List of Aggregation configs used by this Feature View.
aggregation_secondary_keyOptional[str]The column name used as the secondary key for aggregations. See Aggregation Secondary Key docs for more details.
batch_scheduleOptional[datetime.timedelta]The batch schedule of this Feature View.
batch_triggerBatchTriggerTypeThe BatchTriggerType for this FeatureView.
created_atOptional[datetime.datetime]The time that this Tecton object was created or last updated.
defined_inOptional[str]The repo filename where this object was declared.
descriptionstrReturns the description of the Tecton object.
entitiesList[specs.EntitySpec]The Entities for this Feature View.
feature_start_timeOptional[datetime.datetime]
idstrReturns the unique id of the Tecton object.
infoA dataclass containing basic info about this Tecton Object.
is_batch_trigger_manualboolWhether this Feature View’s batch trigger is BatchTriggerType.Manual.
join_keysList[str]The join key column names.
max_source_data_delaydatetime.timedeltaReturns the maximum data delay of input sources for this feature view.
namestrReturns the name of the Tecton object.
online_serving_indexList[str]The set of join keys that will be indexed and queryable during online serving.
ownerOptional[str]Returns the owner of the Tecton object.
run_transformation_validationboolWhether Tecton tests executing Feature View transformations during tecton plan/apply with an empty dataframe.
sourcesThe Source inputs for this Feature View.
schemaList[[types.Field]Tecton schema matching the expected output of the transformation.
tagsDict[str, str]Returns the tags of the Tecton object.
transformationsList[specs.TransformationSpec]The Transformations used by this Feature View.
urlstrReturns a link to the Tecton Web UI.
wildcard_join_keyOptional[set]Returns a wildcard join key column name if it exists; Otherwise returns None.
workspaceOptional[str]Returns the workspace that this Tecton object belongs to.
optionsOptional[Dict[str, str]]A map of additional stream feature view options.

Methods​

NameDescription
__init__(...)Construct a StreamFeatureView.
cancel_materialization_job(...)Cancels the scheduled or running batch materialization job for this Feature View specified by the job identifier.
delete_keys(...)Deletes any materialized data that matches the specified join keys from the FeatureView.
deletion_status(...)Displays information for deletion jobs created with the delete_keys() method,which may include past jobs, scheduled jobs, and job failures.
get_feature_columns()The features produced by this FeatureView.
get_features_for_events(...)Returns a TectonDataFrame of historical values for this Feature View.
get_features_in_range(...)Returns a TectonDataFrame with historical feature values for this Feature View within the input time range.
get_historical_features(...)Returns a TectonDataFrame of historical values for this feature view.
get_materialization_job(...)Retrieves data about the specified materialization job for this Feature View.
get_online_features(...)Returns a single Tecton FeatureVector from the Online Store.
get_timestamp_field()Returns the nane of the timestamp field for this Feature View.
list_materialization_jobs()Retrieves the list of all materialization jobs for this Feature View.
materialization_status(...)Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures.
print_transformation_schema()Prints the explicit schema or the expected output of the transformation.
run(...)Run the FeatureView.
run_stream(...)Starts a streaming job to keep writting the output records of this FeatureView to a temporary table.
run_transformation(...)Executes the transformation pipeline as is without any aggregations or joins.
summary()Displays a human readable summary.
test_run(...)Run the FeatureView using mock data sources.
trigger_materialization_job(...)Starts a batch materialization job for this Feature View.
validate()Validate this Tecton object and its dependencies (if any).
wait_for_materialization_job(...)Blocks until the specified job has been completed.
with_join_key_map(...)Rebind join keys for a Feature View used in a Feature Service.
with_name(...)Rename a Feature View used in a Feature Service.

__init__(...)​

Construct a StreamFeatureView.

init should not be used directly, and instead stream_feature_view() decorator is recommended.

cancel_materialization_job(...)​

Cancels the scheduled or running batch materialization job for this Feature View specified by the job identifier. Once cancelled, a job will not be retried further.

Job run state will be set to MANUAL_CANCELLATION_REQUESTED. Note that cancellation is asynchronous, so it may take some time for the cancellation to complete. If job run is already in MANUAL_CANCELLATION_REQUESTED or in a terminal state then it’ll return the job.

Parameters​

  • job_id (str) – ID string of the materialization job.

Returns​

MaterializationJobData object for the cancelled job.

delete_keys(...)​

Deletes any materialized data that matches the specified join keys from the FeatureView.

This method kicks off a job to delete the data in the offline and online stores. If a FeatureView has multiple entities, the full set of join keys must be specified. Only supports Delta as the offline store.(offline_store=DeltaConfig()) Maximum 500,000 keys can be deleted per request.

Parameters​

  • keys (Union[DataFrame, DataFrame]) – The Dataframe to be deleted. Must conform to the FeatureView join keys.

  • online (bool) – (Optional, default=True) Whether or not to delete from the online store.

  • offline (bool) – (Optional, default=True) Whether or not to delete from the offline store.

Returns​

List of ID strings of the created Entity Deletion jobs.

deletion_status(...)​

warning

deletion_status is deprecated starting in Tecton 0.8. Instead, the call to delete_keys will return a list of job ids that can be passed into get_materialization_job to see the status of your entity deletion jobs.

Displays information for deletion jobs created with the delete_keys() method,which may include past jobs, scheduled jobs, and job failures.

Parameters​

  • verbose – If set to true, method will display additional low level deletion information, useful for debugging. (Default: False)

  • limit – Maximum number of jobs to return. (Default: 1000)

  • sort_columns – A comma-separated list of column names by which to sort the rows. (Default: None)

  • errors_only: If set to true, method will only return jobs that failed with an error. (Default: False)

get_feature_columns()​

The features produced by this FeatureView.

get_features_for_events(...)​

info

This method is functionally equivalent to get_historical_features(spine) and has been renamed in Tecton 0.8+ for clarity. get_historical_features() will be deprecated in a future release.

Returns a TectonDataFrame of historical values for this Feature View.

If no arguments are passed in, all feature values for this Feature View will be returned in a Tecton DataFrame.

Parameters​

  • events (Union[pyspark.sql.DataFrame,pandas.DataFrame,TectonDataFrame]) – A DataFrame of all possible join keys and timestamps that specify which feature values to fetch. To distinguish between columns in the events DataFrame and feature columns, feature columns are labeled as feature_view_name.feature_name in the returned DataFrame.

  • timestamp_key (str) – Name of the time column in the events DataFrame. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of the events DataFrame if there is only one present. If more than one time column is present in the events DataFrame, you must specify which column you’d like to use. (Default: None)

  • from_source (bool) – Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any Feature Views are not materialized. (Default: None)

  • save (bool) – Whether to persist the DataFrame as a Dataset object. (Default: False)

  • save_as (str) – Name to save the DataFrame as. If unspecified and save=True, a name will be generated. (Default: None)

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) – Dictionary for mock inputs that should be used instead of fetching directly from raw data sources. The keys should match the Feature View’s function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False and save/save_as.

  • compute_mode (Union[str, tecton.ComputeMode, None]) – Compute mode to use to produce the data frame. Valid string values are "spark", "snowflake", "athena", and "rift".

Returns​

A TectonDataFrame

Examples​

A FeatureView fv with join key user_id.

  1. fv.get_features_for_events(events) where events=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the events DataFrame.

  2. fv.get_features_for_events(events, save_as='my_dataset) where events=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the events DataFrame. Save the DataFrame as dataset with the name :py:mod`my_dataset`.

  3. fv.get_features_for_events(events, timestamp_key='date_1') where events=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the β€˜date_1’ column in the events DataFrame.

get_features_in_range(...)​

Returns a TectonDataFrame of historical values for this Feature View which were valid within the input time range. A feature value is considered to be valid at a specific point in time if the Online Store would have returned that value if queried at that moment in time.

The DataFrame returned by this method contains the following:

  • Entity Join Key Columns

  • Feature Value Columns and

  • The columns _valid_from and _valid_to that specify the time range for which the row of features values is valid. The time range defined by [_valid_from, _valid_to) will never intersect with any other rows for the same join keys.

    • _valid_from (Inclusive)

      The timestamp from which feature values were valid and returned from the Online Feature Store for the corresponding set of join keys.

      _valid_from will never be less than end_time. Values for which _valid_from is equal to start_time may have been valid prior to start_time.

    • _valid_to (Exclusive) The timestamp from which feature values are invalid and no longer returned from the Online Feature Store for the corresponding set of join keys.

      _valid_to will never be greater than end_time. Values for which _valid_to is equal to end_time may be valid beyond end_time.

By default (i.e. from_source=None), this method fetches feature values from the Offline Store for Feature Views that have offline materialization enabled. Otherwise, this method computes feature values directly from the original data source.

Parameters​

  • start_time (datetime.datetime) – The inclusive start time of the time range to compute features for.

  • end_time (datetime.datetime) – The exclusive end time of the time range to compute features for.

  • max_lookback (datetime.timedelta) – [Non-Aggregate Feature Views Only] A performance optimization that configures how far back before start_time to look for events in the raw data. If set, get_features_in_range() may not include all entities with valid feature values in the specified time range, but get_features_in_range() will never return invalid values.

  • entities (Union[pyspark.sql.DataFrame, pandas.DataFrame, TectonDataFrame]) – Filter feature data returned to a set of entity IDs. If specified, this DataFrame should only contain join key columns. (Default: None)

  • from_source (bool) – Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any Feature Views are not materialized. (Default: None)

  • save (bool) – Whether to persist the DataFrame as a Dataset object. Default is False. (Default: False)

  • save_as (str) – Name to save the DataFrame as. If unspecified and save=True, a name will be generated. (Default: None)

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) – Dictionary for mock inputs that should be used instead of fetching directly from raw data sources. The keys should match the Feature View’s function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False and save/save_as.

  • compute_mode (Union[str, tecton.ComputeMode, None]) – Compute mode to use to produce the DataFrame. Valid string values are "spark", "snowflake", "athena", and "rift".

Returns​

A TectonDataFrame with feature values for the requested time range in the format specified above.

get_historical_features(...)​

Returns a TectonDataFrame of historical values for this feature view.

By default (i.e. from_source=None), this method fetches feature values from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data.

If no arguments are passed in, all feature values for this feature view will be returned in a Tecton DataFrame.

info

The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.

info

The ability to run get_historical_features as part of a unit test was added in SDK 0.7. To utilize this, simply provide the mocked data sources in the mock_inputs parameter in a test that is run via tecton test or pytest.

Parameters​

  • spine (Union[pyspark.sql.DataFrame,pandas.DataFrame,TectonDataFrame]) – The spine to join against, as a dataframe. If present, the returned DataFrame will contain rollups for all (join key, temporal key) combinations that are required to compute a full frame from the spine. To distinguish between spine columns and feature columns, feature columns are labeled as feature_view_name.feature_name in the returned DataFrame. If spine is not specified, it’ll return a DataFrame of feature values in the specified time range. (Default: None)

  • timestamp_key (str) – Name of the time column in the spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of the spine if there is only one present. If more than one time column is present in the spine, you must specify which column you’d like to use. (Default: None)

  • start_time (datetime.datetime) – The interval start time from when we want to retrieve features. If no timezone is specified, will default to using UTC. (Default: None)

  • end_time (datetime.datetime) – The interval end time until when we want to retrieve features. If no timezone is specified, will default to using UTC. (Default: None)

  • entities (Union[pyspark.sql.DataFrame, pandas.DataFrame, TectonDataFrame]) – Filter feature data returned to a set of entity IDs. If specified, this DataFrame should only contain join key columns. (Default: None)

  • from_source (bool) – Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Use from_source=True to force computing from raw data and from_source=False to error if any Feature Views are not materialized. (Default: None)

  • save (bool) – Whether to persist the DataFrame as a Dataset object. (Default: False)

  • save_as (str) – Name to save the DataFrame as. If unspecified and save=True, a name will be generated. (Default: None)

  • mock_inputs (Optional[Dict[str, Union[pandas.DataFrame, pyspark_dataframe.DataFrame]]]) – Dictionary for mock inputs that should be used instead of fetching directly from raw data sources. The keys should match the feature view’s function parameters. For feature views with multiple sources, mocking some data sources and using raw data for others is supported. Using mock_inputs is incompatible with from_source=False and save/save_as.

  • compute_mode (Union[str, tecton.ComputeMode, None]) – Compute mode to use to produce the data frame. Valid string values are "spark", "snowflake", "athena", and "rift".

Returns​

A TectonDataFrame

Examples​

A FeatureView fv with join key user_id.

  1. fv.get_historical_features(spine) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine.

  2. fv.get_historical_features(spine, save_as='my_dataset) where spine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine. Save the DataFrame as dataset with the name my_dataset.

  3. fv.get_historical_features(spine, timestamp_key='date_1') where spine=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]}) Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the β€˜date_1’ column in the spine.

  4. fv.get_historical_features(start_time=datetime(...), end_time=datetime(...)) Fetch all historical features from the offline store in the time range specified by start_time and end_time.

get_materialization_job(...)​

Retrieves data about the specified materialization job for this Feature View.

This data includes information about job attempts.

Parameters​

  • job_id (str) – ID string of the materialization job.

Returns​

MaterializationJobData object for the job.

get_online_features(...)​

Returns a single Tecton FeatureVector from the Online Store.

Parameters​

  • join_keys (Mapping[str, Union[int, int64, str, bytes]]) – The join keys to fetch from the online store.

  • include_join_keys_in_response (bool) – Whether to include join keys as part of the response FeatureVector. (Default: False)

Returns​

A FeatureVector of the results.

Examples​

A FeatureView fv with join key user_id.

  1. fv.get_online_features(join_keys={'user_id': 1}) Fetch the latest features from the online store for user 1.

  2. fv.get_online_features(join_keys={'user_id': 1}, include_join_keys_in_respone=True) Fetch the latest features from the online store for user 1 and include the join key information (user_id=1) in the returned FeatureVector.

get_timestamp_field()​

Returns the nane of the timestamp field for this Feature View.

list_materialization_jobs()​

Retrieves the list of all materialization jobs for this Feature View.

Returns​

List of MaterializationJobData objects.

materialization_status(...)​

Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures.

This method returns different information depending on the type of FeatureView.

Parameters​

  • verbose – If set to true, method will display additional low level materialization information, useful for debugging. (Default: False)
  • limit – Maximum number of jobs to return. (Default: 1000)
  • sort_columns – A comma-separated list of column names by which to sort the rows. (Default: None)
  • errors_only – If set to true, method will only return jobs that failed with an error. (Default: False)
  • If the query schema is explicitly provided (via the Feature View schema parameter), this method will return the explicitly defined schema.
  • If the schema is not provided, then this method will return the derived schema:
    • For locally defined objects, this schema will be derived locally by executing the feature view transformation(s).
    • For backend applied objects, this schema will have been derived during tecton plan/apply.

run(...)​

Run the FeatureView. Supports transforming data directly from raw data sources or using mock data.

To run the feature view with data from raw data sources, the environment must have access to the data sources.

info

The ability to call run as part of a unit test was added in SDK 0.7. To utilize this, provide the mocked data sources as part of the mock_sources kwargs in a test that is run via tecton test or pytest.

Parameters​

  • start_time (datetime) – The start time of the time window to materialize.

  • end_time (datetime) – The end time of the time window to materialize.

  • aggregation_level (Optional[str]) – For feature views with aggregations, aggregation_level configures which stage of the aggregation to run up to. The query for Aggregate Feature Views operates in three steps:

    1. The feature view query is run over the provided time range. The user defined transformations are applied over the data source.

    2. The result of #1 is aggregated into tiles the size of the aggregation_interval.

    3. The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.

    For testing and debugging purposes, to see the output of #1, use aggregation_level="disabled". For #2, use aggregation_level="partial". For #3, use aggregation_level="full".

    aggregation_level="full" is the default behavior.

  • compute_mode (Union[str, tecton.ComputeMode, None]) – Compute mode to use to produce the data frame. Valid string values are "spark", "snowflake", "athena", and "rift".

  • **mock_sources – kwargs for mock sources that should be used instead of fetching directly from raw data sources. The keys should match the feature view’s function parameters. For feature views with multiple sources, mocking some data sources and using raw data for others is supported.

Returns​

A tecton DataFrame of the results.

Example​

import tecton
import pandas
from datetime import datetime

# Example running a non-aggregate feature view with mock data.
feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_feature_view")

mock_fraud_user_data = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1, 0), datetime(2022, 5, 1, 2), datetime(2022, 5, 1, 5)],
"credit_card_number": [1000, 4000, 5000],
}
)

result = feature_view.run(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), fraud_users_batch=mock_fraud_user_data
) # `fraud_users_batch` is the name of this FeatureView's data source parameter.

# Example running an aggregate feature view with real data.
aggregate_feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_aggregate_feature_view")

result = aggregate_feature_view.run(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), aggregation_level="full"
) # or "partial" or "disabled"

run_stream(...)​

Starts a streaming job to keep writting the output records of this FeatureView to a temporary table.

The job will be running until the execution is terminated.

After records have been written to the table, they can be queried using spark.sql(). If ran in a Databricks notebook, Databricks will also automatically visualize the number of incoming records.

Parameters​

  • output_temp_table (str) – The name of the temporary table to write to.

Example​

  1. fv.run_stream(output_temp_table="temp_table") Start a streaming job.

  2. display(spark.sql("SELECT \* FROM temp_table LIMIT 5")) Query the output table, and display the output dataframe.

run_transformation(...)​

info

In 0.8+, this method is a intended to be a replacement for the .run() method. The .run() method will be deprecated in a future release.

Executes the Feature View transformation pipeline as is without any aggregations or joins. Supports transforming data directly from raw data sources or using mock data.

To run the Feature View with data from raw data sources, the environment must have access to the data sources.

Parameters​

  • start_time (datetime) – The start time of the time window to materialize.

  • end_time (datetime) – The end time of the time window to materialize.

  • mock_inputs(Optional[Dict[str, Union[pyspark.sql.DataFrame,pandas.DataFrame]]) – Dictionary of mock sources that should be used instead of fetching directly from raw data sources. The keys should match the Feature View’s function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported.

  • compute_mode (Union[str, tecton.ComputeMode, None]) – Compute mode to use to produce the DataFrame. Valid string values are "spark", "snowflake", "athena", and "rift".

Returns​

A Tecton DataFrame of the results.

Example​

import tecton
import pandas
from datetime import datetime

# Example running a non-aggregate Feature View with mock data.
feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_feature_view")

mock_fraud_user_data = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1, 0), datetime(2022, 5, 1, 2), datetime(2022, 5, 1, 5)],
"credit_card_number": [1000, 4000, 5000],
}
)

result = feature_view.run_transformation(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
mock_inputs={"fraud_users_batch": mock_fraud_user_data},
) # `fraud_users_batch` is the name of this Feature View's data source parameter.

# Example running an aggregate Feature View with real data.
aggregate_feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_aggregate_feature_view")

result = aggregate_feature_view.run_transformation(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2))

summary()​

Displays a human readable summary of this data source.

test_run(...)​

Run the FeatureView using mock data sources. This requires a local spark session.

warning

test_run is deprecated starting in Tecton 0.7. This functionality has been replaced by run supporting unit tests. However, test_run is still currently maintained for backwards compatibility.

Unlike run(), test_run() is intended for unit testing. It will not make calls to your connected Tecton cluster or attempt to read real data.

Parameters​

  • start_time (datetime) – The start time of the time window to materialize.

  • end_time (datetime) – The end time of the time window to materialize.

  • aggregation_level (Optional[str]) – For feature views with aggregations, aggregation_level configures what stage of the aggregation to run up to.

    The query for Aggregate Feature Views operates in three logical steps:

    1. The feature view query is run over the provided time range. The user defined transformations are applied over the data source.

    2. The result of #1 is aggregated into tiles the size of the aggregation_interval.

    3. The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.

    For testing and debugging purposes, to see the output of #1, use aggregation_level="disabled". For #2, use aggregation_level="partial". For #3, use aggregation_level="full".

    aggregation_level="full" is the default behavior.

  • compute_mode (Union[str, tecton.ComputeMode, None]) – Compute mode to use to produce the data frame. Valid string values are "spark", "snowflake", "athena", and "rift".

  • **mock_sources – kwargs with expected same keys as the FeatureView’s inputs parameter. Each input name maps to a Spark DataFrame that should be evaluated for that node in the pipeline.

Returns​

A TectonDataFrame.

Example​

from datetime import datetime, timedelta
import pandas
from fraud.features.batch_features.user_credit_card_issuer import user_credit_card_issuer


# The `tecton_pytest_spark_session` is a PyTest fixture that provides a
# defined PySpark session for testing Spark transformations and feature
# views.
def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

# Simulate materializing features for May 1st.
output = user_credit_card_issuer.run(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), fraud_users_batch=input_spark_df
)

actual = output.to_pandas()

expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"],
}
)

pandas.testing.assert_frame_equal(actual, expected)

trigger_materialization_job(...)​

Starts a batch materialization job for this Feature View.

Parameters​

  • start_time (datetime) – The job will materialize feature values between the start_time and end_time.

  • end_time (datetime) – The job will materialize feature values between the start_time and end_time.

  • online (bool) – Whether the job will materialize features to the online store. The Feature View must be configured with online=True in order to materialize features online.

  • offline (bool) – Whether the job will materialize features to the offline store. The Feature View must be configured with offline=True in order to materialize features offline.

  • use_tecton_managed_retries (bool) – If enabled, Tecton will automatically retry failed attempts. Disable to manage your own retry behavior. (Default: True)

  • overwrite (bool) – If enabled, allow triggering materialization for periods of time that have previously materialized data. For more details, see here. (Default: False)

Returns​

ID string of the created materialization job.

Raises​

TectonValidationError – If job params are not valid.

validate()​

Validate this Tecton object and its dependencies (if any).

Validation performs most of the same checks and operations as tecton plan.

  1. Check for invalid object configurations, e.g. setting conflicting fields.

  2. For Data Sources and Feature Views, test query code and derive schemas. e.g. test that a Data Source’s specified s3 path exists or that a Feature View’s SQL code executes and produces supported feature data types.

Objects already applied to Tecton do not need to be re-validated on retrieval (e.g. fv = tecton.get_workspace('prod').get_feature_view('my_fv')) since they have already been validated during tecton plan. Locally defined objects (e.g. my_ds = BatchSource(name="my_ds", ...)) may need to be validated before some of their methods can be called, e.g. my_feature_view.get_features_for_events().

wait_for_materialization_job(...)​

Blocks until the specified job has been completed.

Parameters​

  • job_id (str) – ID string of the materialization job.

  • timeout (Optional[timedelta]) – (Optional) timeout for this function. An exception is raised if the job does not complete within the specified time. (Default: None)

Returns​

MaterializationJobData object for the successful job.

Raises​

  • MaterializationTimeoutException – If timeout param is specified and job does not complete within the specified time.

  • MaterializationJobFailedException – If materialization job did not reach a successful state.

with_join_key_map(...)​

Parameters​

  • join_key_map

Example​

Rebind join keys for a Feature View used in a Feature Service.

The keys in join_key_map should be the feature view join keys, and the values should be the feature service overrides.

from tecton import FeatureService

# The join key for this feature service will be "feature_service_user_id".
feature_service = FeatureService(
name="feature_service",
features=[
my_feature_view.with_join_key_map({"user_id": "feature_service_user_id"}),
],
)

# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
name="transaction_fraud_service",
features=[
# Select a subset of features from a feature view.
transaction_features[["amount"]],
# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
# transaction sender and recipient, so include the feature view twice and bind it to two different feature
# service join keys.
user_features.with_name("sender_features").with_join_key_map({"user_id": "sender_id"}),
user_features.with_name("recipient_features").with_join_key_map({"user_id": "recipient_id"}),
],
)

with_name(...)​

Rename a Feature View used in a Feature Service.

Parameters​

  • namespace

Example​

from tecton import FeatureService

# The feature view in this feature service will be named "new_named_feature_view" in training data dataframe
# columns and other metadata.
feature_service = FeatureService(
name="feature_service",
features=[my_feature_view.with_name("new_named_feature_view")],
)

# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
name="transaction_fraud_service",
features=[
# Select a subset of features from a feature view.
transaction_features[["amount"]],
# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
# transaction sender and recipient, so include the feature view twice and bind it to two different feature
# service join keys.
user_features.with_name("sender_features").with_join_key_map({"user_id": "sender_id"}),
user_features.with_name("recipient_features").with_join_key_map({"user_id": "recipient_id"}),
],
)

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon