StreamFeatureView
Summary​
A Tecton Stream Feature View, used for transforming and materializing features from a StreamSource.Â
The StreamFeatureView should not be instantiated directly, the
@stream_feature_view
decorator is recommended instead.
Examples​
Example Using Attributes​
from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import Aggregation, stream_feature_view, Attribute
from tecton.types import Int64, String, Timestamp
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
ttl=timedelta(days=30),
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
features=[
Attribute("LAST_TRANSACTION_AMOUNT", Int64),
Attribute("LAST_TRANSACTION_CATEGORY", String),
],
timestamp_field="timestamp",
run_transformation_validation=True,
description="Features about the users most recent transaction in the past 30 days. Updated continuously.",
tecton_materialization_runtime="1.0.0",
)
def user_last_transaction_features(transactions_stream):
return f"""
SELECT
USER_ID,
TIMESTAMP,
AMOUNT as LAST_TRANSACTION_AMOUNT,
CATEGORY as LAST_TRANSACTION_CATEGORY
FROM
{transactions_stream}
"""
Example Using Aggregates​
from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import stream_feature_view, TimeWindow, Aggregate
from tecton.types import Int64, String, Timestamp, Field
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10),
features=[
Aggregate(input_column=Field("AMOUNT", Int64), function="mean", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("AMOUNT", Int64), function="mean", time_window=timedelta(timedelta(hours=24))),
Aggregate(input_column=Field("AMOUNT", Int64), function="mean", time_window=timedelta(hours=72)),
Aggregate(input_column=Field("AMOUNT", Int64), function="sum", time_window=timedelta(hours=1)),
Aggregate(input_column=Field("AMOUNT", Int64), function="sum", time_window=timedelta(hours=24)),
Aggregate(input_column=Field("AMOUNT", Int64), function="sum", time_window=timedelta(hours=72)),
],
online=True,
feature_start_time=datetime(2020, 10, 10),
timestamp_field="timestamp",
description="Transaction amount statistics and total over a series of time windows, updated every ten minutes.",
tecton_materialization_runtime="1.0.0",
)
def user_recent_transaction_aggregate_features(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
@stream_feature_view (Decorator)​
Used to instantiate a StreamFeatureView
Declare a Stream Feature View.Parameters
mode
(str
) - (Required) Whether the annotated function is a pipeline function ("pipeline" mode) or a transformation function ("spark_sql" or "pyspark" mode). For the non-pipeline mode, an inferred transformation will also be registered.source
(Union
[framework_data_source.StreamSource
,FilteredSource
]) - (Required) The data source input to the feature view.entities
(Sequence
[framework_entity.Entity
]) - (Required) The entities this feature view is associated with.timestamp_field
(str
) - (Required) The column name that refers to the timestamp for records that are produced by the feature view. This parameter is optional if exactly one column is a Timestamp type.features
(Union
[Sequence
[feature.Aggregate
],Sequence
[feature.Attribute
]]) - (Required) A list of features this feature view manages.name
(Optional
[str
]) - Unique, human friendly name that identifies the FeatureView. Defaults to the function name. Default:None
description
(Optional
[str
]) - A human readable description. Default:None
owner
(Optional
[str
]) - Owner name (typically the email of the primary maintainer). Default:None
tags
(Optional
[Dict
[str
,str
]]) - Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). Default:None
prevent_destroy
(bool
) - If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object,prevent_destroy
must be set to False via the same tecton apply or a separate tecton apply.prevent_destroy
can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs.prevent_destroy
also blocks changes to dependent Tecton objects that would trigger a recreation of the tagged object, e.g. ifprevent_destroy
is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service.prevent_destroy
is only enforced in live (i.e. non-dev) workspaces. Default:false
aggregation_interval
(Optional
[datetime.timedelta
]) - How frequently the feature value is updated (for example,"1h"
or"6h"
) Default:None
aggregation_secondary_key
(Optional
[str
]) - Configures secondary key aggregates using the set column. Only valid when using aggregations. Default:None
stream_processing_mode
(Optional
[StreamProcessingMode
]) - Whether aggregations should be "batched" in time intervals or be updated continuously. Continuously aggregated features are fresher but more expensive. One ofStreamProcessingMode.TIME_INTERVAL
orStreamProcessingMode.CONTINUOUS
. Default:StreamProcessingMode.TIME_INTERVAL
online
(bool
) - Whether the feature view should be materialized to the online feature store. Default:false
offline
(bool
) - Whether the feature view should be materialized to the offline feature store. Default:false
ttl
(Optional
[datetime.timedelta
]) - The TTL (or "look back window") for features defined by this feature view. This parameter determines how long features will live in the online store and how far to "look back" relative to a training example's timestamp when generating offline training sets. Shorter TTLs improve performance and reduce costs. Default:None
feature_start_time
(Optional
[datetime.datetime
]) - When materialization for this feature view should start from. (Required ifoffline=true
oronline=true
) Default:None
lifetime_start_time
(Optional
[datetime.datetime
]) - The start time for what data should be included in a lifetime aggregate. (Required if using lifetime windows) Default:None
manual_trigger_backfill_end_time
(Optional
[datetime.datetime
]) - If set, Tecton will schedule backfill materialization jobs for this feature view up to this time. Materialization jobs after this point must be triggered manually. (This param is only valid to set if BatchTriggerType is MANUAL.) Default:None
batch_trigger
(BatchTriggerType
) - Defines the mechanism for initiating batch materialization jobs. One ofBatchTriggerType.SCHEDULED
orBatchTriggerType.MANUAL
. The default value isBatchTriggerType.SCHEDULED
, where Tecton will run materialization jobs based on the schedule defined by thebatch_schedule
parameter. If set toBatchTriggerType.MANUAL
, then batch materialization jobs must be explicitly initiated by the user through either the Tecton SDK or Airflow operator. Default:None
batch_schedule
(Optional
[datetime.timedelta
]) - The interval at which batch materialization should be scheduled. Default:None
online_serving_index
(Optional
[Sequence
[str
]]) - (Advanced) Defines the set of join keys that will be indexed and queryable during online serving. Default:None
batch_compute
(Optional
[configs.ComputeConfigTypes
]) - Batch materialization cluster configuration. Default:None
stream_compute
(Optional
[configs.ComputeConfigTypes
]) - Streaming materialization cluster configuration. Default:None
offline_store
(Optional
[Union
[configs.OfflineStoreConfig
,configs.ParquetConfig
,configs.DeltaConfig
]]) - Configuration for how data is written to the offline feature store. Default:None
online_store
(Optional
[configs.OnlineStoreTypes
]) - Configuration for how data is written to the online feature store. Default:None
monitor_freshness
(bool
) - If true, enables monitoring when feature data is materialized to the online feature store. Default:false
data_quality_enabled
(Optional
[bool
]) - If false, disables data quality metric computation and data quality dashboard. Default:None
skip_default_expectations
(Optional
[bool
]) - If true, skips validating default expectations on the feature data. Default:None
expected_feature_freshness
(Optional
[datetime.timedelta
]) - Threshold used to determine if recently materialized feature data is stale. Data is stale ifnow - most_recent_feature_value_timestamp > expected_feature_freshness
. For feature views using Tecton aggregations, data is stale ifnow - round_up_to_aggregation_interval(most_recent_feature_value_timestamp) > expected_feature_freshness
. Whereround_up_to_aggregation_interval()
rounds up the feature timestamp to the end of theaggregation_interval
. Value must be at least 2 timesaggregation_interval
. If not specified, a value determined by the Tecton backend is used. Default:None
alert_email
(Optional
[str
]) - Email that alerts for this FeatureView will be sent to. Default:None
max_backfill_interval
(Optional
[datetime.timedelta
]) - (Advanced) The time interval for which each backfill job will run to materialize feature data. This affects the number of backfill jobs that will run, which is (<feature registration time>
-feature_start_time
) /max_backfill_interval
. Configuring themax_backfill_interval
parameter appropriately will help to optimize large backfill jobs. If this parameter is not specified, then 10 backfill jobs will run (the default). Default:None
max_batch_aggregation_interval
(Optional
[datetime.timedelta
]) - Deprecated. Use max_backfill_interval instead, which has the exact same usage. Default:None
output_stream
(Optional
[configs.OutputStream
]) - Configuration for a stream to write feature outputs to, specified as atecton.framework.configs.KinesisOutputStream
ortecton.framework.configs.KafkaOutputStream
. Default:None
options
(Optional
[Dict
[str
,str
]]) - Additional options to configure the Feature View. Used for advanced use cases and beta features. Default:None
run_transformation_validation
(Optional
[bool
]) - IfTrue
, Tecton will execute the Feature View transformations during tecton plan/apply validation. IfFalse
, then Tecton will not execute the transformations during validation andschema
must be set. Skipping query validation can be useful to speed up tecton plan/apply or for Feature Views that have issues with Tecton's validation (e.g. some pip dependencies). Default is True for Spark and Snowflake Feature Views and False for Python and Pandas Feature Views or Feature Views with Push Sources. Default:None
tecton_materialization_runtime
(Optional
[str
]) - Version oftecton
package used by your job cluster. Default:None
cache_config
(Optional
[configs.CacheConfig
]) - Cache config for the Feature View. Including this option enables the feature server to use the cache when retrieving features for this feature view. Will only be respected if the feature service containing this feature view hasenable_online_caching
set toTrue
. Default:None
stream_compaction_enabled
(Optional
[bool
]) - Deprecated: Please usestream_tiling_enabled
instead which has the exact same usage. Default:None
batch_compaction_enabled
(Optional
[bool
]) - Deprecated: Please usecompaction_enabled
instead which has the exact same usage. Default:None
compaction_enabled
(Optional
[bool
]) - (Private preview) IfTrue
, Tecton will run a compaction job after each batch materialization job to write to the online store. This requires the use of Dynamo and uses the ImportTable API. Because each batch job overwrites the online store, a larger compute cluster may be required. This is required to be True ifstream_compaction_enabled
is True. defaults toFalse
Default:False
stream_tiling_enabled
(Optional
[bool
]) - (Private preview) IfFalse
, Tecton transforms and writes all events from the stream to the online store (same as stream_processing_mode=StreamProcessingMode.CONTINUOUS
) . IfTrue
, Tecton will store the partial aggregations of the events in the online store. Default:False
environment
(Optional
[str
]) - The custom environment in which materialization jobs will be run. Defaults toNone
, which means jobs will execute in the default Tecton environment. Default:None
context_parameter_name
(Optional
[str
]) - Name of the function parameter that Tecton injects MaterializationContext object to. Default:None
aggregation_leading_edge
(Optional
[AggregationLeadingEdge
]) - (Advanced) Specifies the timestamp used for the leading edge of aggregation time windows. This parameter only affects online serving. See the AggregationLeadingEdge class documentation or the Tecton docs for more information. Defaults to AggregationLeadingEdge.WALL_CLOCK_TIME. Default:None
Returns
An object of typeStreamFeatureView
.
StreamFeatureView (Class)​
Attributes​
Name | Data Type | Description |
---|---|---|
aggregation_interval | Optional[datetime.timedelta] | How frequently the feature values are updated |
aggregation_secondary_key | Optional[str] | Configures secondary key aggregates using the set column. |
aggregations | List[configs.Aggregation] | List of Aggregation configs used by this Feature View. |
alert_email | Optional[str] | Email that alerts for this FeatureView will be sent to. |
batch_schedule | Optional[datetime.timedelta] | The batch schedule of this Feature View. |
batch_trigger | BatchTriggerType | The BatchTriggerType for this FeatureView. |
cache_config | Optional[configs.CacheConfig] | Uses cache for Feature View if online caching is enabled. |
compaction_enabled | bool | (Private preview) Runs compaction job post-materialization; requires Dynamo and ImportTable API. |
context_parameter_name | Optional[str] | Name of the function parameter that Tecton injects MaterializationContext object to. |
created_at | Optional[datetime.datetime] | Returns the time that this Tecton object was created or last updated. None for locally defined objects. |
defined_in | Optional[str] | The repo filename where this object was declared. None for locally defined objects. |
description | Optional[str] | Returns the description of the Tecton object. |
entities | ||
environment | Optional[str] | The custom environment in which materialization jobs will be run. Defaults to None , which means jobs will execute in the default Tecton environment. |
expected_feature_freshness | Optional[pendulum.Duration] | Threshold used to determine if recently materialized feature data is stale. |
feature_metadata | List[FeatureMetadata] | |
feature_start_time | Optional[datetime.datetime] | |
id | str | Returns the unique id of the Tecton object. |
incremental_backfills | bool | Backfills incrementally from feature_start_time to current time, one interval at a time |
info | ||
is_batch_trigger_manual | bool | Whether this Feature View's batch trigger is BatchTriggerType.Manual . |
join_keys | List[str] | The join key column names. |
manual_trigger_backfill_end_time | Optional[pendulum.datetime] | If set, Tecton will schedule backfill materialization jobs for this Feature View up to this time. |
max_backfill_interval | Optional[pendulum.Duration] | (Advanced) The time interval for which each backfill job will run to materialize feature data. This affects the number of backfill jobs that will run,
which is (<feature registration time> - feature_start_time ) / max_backfill_interval . Configuring the max_backfill_interval parameter appropriately
will help to optimize large backfill jobs. If this parameter is not specified, then 10 backfill jobs will run (the default). |
max_source_data_delay | datetime.timedelta | Returns the maximum data delay of input sources for this feature view. |
monitor_freshness | bool | If true, enables monitoring when feature data is materialized to the online feature store. |
name | str | Returns the name of the Tecton object. |
offline | bool | Whether the Feature View is materialized to the offline feature store. |
offline_store | Optional[Union[configs.DeltaConfig, configs.ParquetConfig]] | Configuration for the Offline Store of this Feature View. |
online | bool | Whether the Feature View is materialized to the online feature store. |
online_serving_index | List[str] | The set of join keys that will be indexed and queryable during online serving. Â defaults to the complete set of join keys. |
output_stream | Configuration for a stream to write feature outputs to, specified as a tecton.framework.configs.KinesisOutputStream or tecton.framework.configs.KafkaOutputStream . | |
owner | Optional[str] | Returns the owner of the Tecton object. |
prevent_destroy | bool | If set to True, Tecton will block destructive actions taken on this Feature View or Feature Table. |
published_features_path | Optional[str] | The location of published features in the offline store. |
sources | ||
stream_tiling_enabled | bool | (Private preview) If False , Tecton transforms and writes all events from the stream to the online store (same as stream_processing_mode=StreamProcessingMode.CONTINUOUS ) . If True , Tecton will store the partial aggregations of the events in the online store. defaults to False . |
tags | Dict[str, str] | Returns the tags of the Tecton object. |
tecton_materialization_runtime | Optional[str] | Version of tecton package used by your job cluster. |
timestamp_field | Optional[str] | timestamp column name for records from the feature view. |
transformations | ||
ttl | Duration | The TTL (or look back window) for features defined by this Feature View. This parameter determines how long features will live in the online store and how far to look back relative to a training example's timestamp when generating offline training sets. |
url | str | Returns a link to the Tecton Web UI. |
wildcard_join_key | Optional[set] | Returns a wildcard join key column name if it exists; Otherwise returns None. |
workspace | Optional[str] | Returns the workspace that this Tecton object belongs to. None for locally defined objects. |
Methods​
Name | Description |
---|---|
cancel_materialization_job(...) | Cancels the scheduled or running job by the job identifier. |
delete_keys(...) | Deletes any materialized data that matches the specified join keys from the FeatureView. |
get_feature_columns() | Retrieves the list of feature columns produced by this FeatureView. |
get_features_for_events(...) | Returns a TectonDataFrame of historical values for this feature view. |
get_features_in_range(...) | Returns a TectonDataFrame of historical values for this feature view which were valid within the input time range. |
get_historical_features(...) | Returns a TectonDataFrame of historical values for this feature view. |
get_job(...) | Retrieves data about the specified job (materialization or dataset generation). |
get_materialization_job(...) | Retrieves data about the specified materialization job. |
get_online_features(...) | Returns a single Tecton tecton.FeatureVector from the Online Store. |
get_partial_aggregates(...) | Returns the partially aggregated tiles in between start_time and end_time for a Feature View that uses the Tecton Aggregation Engine. |
get_timestamp_field() | Returns the name of the timestamp field for this Feature View. |
list_jobs() | Retrieves the list of all jobs (materialization and dataset generation) for this Feature View or Feature Table. |
list_materialization_jobs() | Retrieves the list of all materialization jobs for this feature view. |
materialization_status(...) | Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures. |
print_transformation_schema() | Prints the schema of the output of the transformation. |
run(...) | Run the FeatureView. Supports transforming data directly from raw data sources or using mock data. |
run_stream(...) | Starts a streaming job to keep writing the output records of this FeatureView to a temporary table. |
run_transformation(...) | Run the FeatureView Transformation as is without any aggregations or joins. Supports transforming data directly from raw data sources or using mock data. |
summary() | Displays a human-readable summary. |
test_run(...) | Run the FeatureView using mock data sources. This requires a local spark session. |
transformation_schema() | Returns the schema of the output of the transformation. |
trigger_materialization_job(...) | Starts a batch materialization job for this Feature View. |
validate() | Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary. |
wait_for_materialization_job(...) | Blocks until the specified job has been completed. |
with_join_key_map(...) | Rebind join keys for a Feature View or Feature Table used in a Feature Service. |
with_name(...) | Rename a Feature View or Feature Table used in a Feature Service. |
cancel_materialization_job(...)​
Cancels the scheduled or running job by the job identifier. Once cancelled, a job will not be retried further.Â
Job run state will be set to
MANUAL_CANCELLATION_REQUESTED
.
Note that cancellation is asynchronous, so it may take some time for the cancellation to complete.
If job run is already in MANUAL_CANCELLATION_REQUESTED
or in a terminal state then it'll return the job.Parameters
job_id
(str
) - ID string of the materialization job.
Returns
MaterializationJob
: JobData
object for the cancelled job.
delete_keys(...)​
Deletes any materialized data that matches the specified join keys from the FeatureView.Â
This method kicks off a job to delete the data in the offline and online stores. If a FeatureView has multiple entities, the full set of join keys must be specified. Only supports Delta as the offline store (
offline_store=DeltaConfig()
).
Maximum 500,000 keys can be deleted per request.Parameters
keys
(Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
]) - The Dataframe to be deleted. Must conform to the FeatureView join keys.online
(bool
) - Whether or not to delete from the online store. Default:true
offline
(bool
) - Whether or not to delete from the offline store. Default:true
Returns
List
[str
]: List of job ids for jobs created for entity deletion.
get_feature_columns(...)​
Retrieves the list of feature columns produced by this FeatureView.Returns
List
[str
]: The features produced by this FeatureView.
get_features_for_events(...)​
This method is functionally equivalent to get_historical_features(spine)
and
has been renamed in Tecton 0.8 for clarity. get_historical_features()
is
planned to be deprecated in a future release.
TectonDataFrame
of historical values for this feature view.Â
By default (i.e.
from_source=None
), this method fetches feature values from the Offline Store for Feature
Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data.Parameters
events
(Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
,str
]) - ADataFrame
containing all possible join key combinations and timestamps specifying which feature values to fetch. The returned DataFrame includes rollups for all (join key, timestamp) combinations necessary to compute a complete dataset. To differentiate between event columns and feature columns, feature columns are labeled as feature_view_name.feature_name in the returnedDataFrame
.timestamp_key
(Optional
[str
]) - Name of the time column in theevents
DataFrame
. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of theevents
DataFrame
if there is only one present. If more than one time column is present in theevents
DataFrame
, you must specify which column you would like to use. Default:None
from_source
(Optional
[bool
]) - Whether feature values should be recomputed from the original data source. IfNone
, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Usefrom_source=True
to force computing from raw data andfrom_source=False
to error if any Feature Views are not materialized. Defaults to None. Default:None
mock_inputs
(Optional
[Dict
[str
,Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Valid examples includespark
andrift
. Default:None
Returns
TectonDataFrame
: A TectonDataFrame
.
Examples​
A FeatureView fv
with join key user_id
.
-
fv.get_features_for_events(events)
whereevents=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in theevents
DataFrame
. -
fv.get_features_for_events(events, save_as='my_dataset)
whereevents=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in theevents
DataFrame
. Save theDataFrame
as dataset with the name `my_dataset`. -
fv.get_features_for_events(events, timestamp_key='date_1')
whereevents=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in thedate_1
column in theevents
DataFrame
.
get_features_in_range(...)​
Returns aTectonDataFrame
of historical values for this feature view which were valid within the input time range.Â
A feature value is considered to be valid at a specific point in time if the Online Store would have returned that value if queried at that moment in time.
Â
The DataFrame returned by this method contains the following:
- Entity Join Key Columns
- Feature Value Columns
- The columns
_valid_from
and_valid_to
that specify the time range for which the row of features values is valid. The time range defined by [_valid_from
,_valid_to
) will never intersect with any other rows for the same join keys. _valid_from
(Inclusive) is the timestamp from which feature values were valid and returned from the Online Feature Store for the corresponding set of join keys._valid_from
will never be less thanend_time
. Values for which_valid_from
is equal tostart_time
may have been valid prior tostart_time
._valid_to
(Exclusive) is the timestamp from which feature values are invalid and no longer returned from the Online Feature Store for the corresponding set of join keys._valid_to
will never be greater thanend_time
. Values for which_valid_to
is equal toend_time
may be valid beyondend_time
.- By default, (i.e.
from_source=None
), this method fetches feature values from the Offline Store for Feature Views that have offline materialization enabled. Otherwise, this method computes feature values directly from the original data source.
Parameters
start_time
(datetime.datetime
) - The inclusive start time of the time range to compute features for.end_time
(datetime.datetime
) - The exclusive end time of the time range to compute features for.entities
(Optional
[Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
]]) - A DataFrame that is used to filter down feature values. If specified, this DataFrame should only contain join key columns. Default:None
max_lookback
(Optional
[datetime.timedelta
]) - [Non-Aggregate Feature Views Only] A performance optimization that configures how far back before start_time to look for events in the raw data. If set,get_features_in_range()
may not include all entities with valid feature values in the specified time range, butget_features_in_range()
will never return invalid values. Default:None
from_source
(Optional
[bool
]) - Whether feature values should be recomputed from the original data source. If None, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Usefrom_source=True
to force computing from raw data andfrom_source=False
to error if any Feature Views are not materialized. Default:None
mock_inputs
(Optional
[Dict
[str
,Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Examples includespark
andrift
. Default:None
Returns
TectonDataFrame
: A TectonDataFrame with Feature Values for the requested time range in the format specified above.
get_historical_features(...)​
The timestamp_key parameter is only applicable when a spine is passed in. Parameters start_time, end_time, and entities are only applicable when a spine is not passed in.
The ability to run get_historical_features
as part of a unit test was added in
SDK 0.7. To utilize this, provide the mocked data sources in the mock_inputs
parameter in a test that is run via tecton test
or pytest
.
TectonDataFrame
of historical values for this feature view.Â
By default (i.e.
from_source=None
), this method fetches feature values from the Offline Store for Feature
Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data.Â
If no arguments are passed in, all feature values for this feature view will be returned in a Tecton DataFrame.
Â
Note: The
timestamp_key
parameter is only applicable when a spine is passed in.
Parameters start_time
, end_time
, and entities
are only applicable when a spine is not passed in.Parameters
spine
(Optional
[Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
,str
]]) - The spine to join against, as a dataframe. If present, the returned DataFrame will contain rollups for all (join key, temporal key) combinations that are required to compute a full frame from the spine. To distinguish between spine columns and feature columns, feature columns are labeled asfeature_view_name.feature_name
in the returned DataFrame. If spine is not specified, it'll return a DataFrame of feature values in the specified time range. Default:None
timestamp_key
(Optional
[str
]) - Name of the time column in the spine. This method will fetch the latest features computed before the specified timestamps in this column. If unspecified, will default to the time column of the spine if there is only one present. If more than one time column is present in the spine, you must specify which column you'd like to use. Default:None
start_time
(Optional
[datetime.datetime
]) - The interval start time from when we want to retrieve features. If no timezone is specified, will default to using UTC. Default:None
end_time
(Optional
[datetime.datetime
]) - The interval end time until when we want to retrieve features. If no timezone is specified, will default to using UTC. Default:None
entities
(Optional
[Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
]]) - Filter feature data returned to a set of entity IDs. If specified, this DataFrame should only contain join key columns. Default:None
from_source
(Optional
[bool
]) - Whether feature values should be recomputed from the original data source. IfNone
, feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Usefrom_source=True
to force computing from raw data andfrom_source=False
to error if any Feature Views are not materialized. Defaults to None. Default:None
mock_inputs
(Optional
[Dict
[str
,Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Valid examples includespark
andrift
. Default:None
Returns
TectonDataFrame
: A TectonDataFrame
.
Examples​
A FeatureView fv
with join key user_id
.
-
fv.get_historical_features(spine)
wherespine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine. -
fv.get_historical_features(spine, save_as='my_dataset)
wherespine=pandas.Dataframe({'user_id': [1,2,3], 'date': [datetime(...), datetime(...), datetime(...)]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in the spine. Save the DataFrame as dataset with the namemy_dataset
. -
fv.get_historical_features(spine, timestamp_key='date_1')
wherespine=pandas.Dataframe({'user_id': [1,2,3], 'date_1': [datetime(...), datetime(...), datetime(...)], 'date_2': [datetime(...), datetime(...), datetime(...)]})
Fetch historical features from the offline store for users 1, 2, and 3 for the specified timestamps in thedate_1
column in the spine. -
fv.get_historical_features(start_time=datetime(...), end_time=datetime(...))
Fetch all historical features from the offline store in the time range specified by start_time and end_time.
get_job(...)​
Retrieves data about the specified job (materialization or dataset generation).Parameters
job_id
(str
) - ID string of the job.
Returns
TectonJob
: JobData
object for the job.
get_materialization_job(...)​
Retrieves data about the specified materialization job.Parameters
job_id
(str
) - ID string of the materialization job.
Returns
MaterializationJob
: MaterializationJobData
object for the job.
get_online_features(...)​
Returns a single Tectontecton.FeatureVector
from the Online Store.Parameters
join_keys
(Mapping
[str
,Union
[int
,numpy.int_
,str
,bytes
]]) - The join keys to fetch from the online store.include_join_keys_in_response
(bool
) - Whether to include join keys as part of the response FeatureVector. Default:false
Returns
FeatureVector
: A tecton.FeatureVector
of the results.
Examples​
A FeatureView fv
with join key user_id
.
-
fv.get_online_features(join_keys={'user_id': 1})
Fetch the latest features from the online store for user 1. -
fv.get_online_features(join_keys={'user_id': 1}, include_join_keys_in_response=True)
Fetch the latest features from the online store for user 1 and include the join key information (user_id=1) in the returned FeatureVector.
get_partial_aggregates(...)​
Returns the partially aggregated tiles in betweenstart_time
and end_time
for a Feature View that uses the Tecton Aggregation Engine.Parameters
start_time
(datetime.datetime
) - The start time of the time window to fetch partial aggregates for. The start_time will fall in the time interval of the first tile that is returned.end_time
(datetime.datetime
) - The end time of the time window to fetch partial aggregates for. The end_time will fall in the time interval of the last tile that is returned.mock_inputs
(Optional
[Dict
[str
,Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For feature views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
. Default:None
entities
(Optional
[Union
[pyspark_dataframe.DataFrame
,pandas.DataFrame
,TectonDataFrame
]]) - A DataFrame that is used to filter down feature values. If specified, this DataFrame should only contain join key columns. Default:None
from_source
(Optional
[bool
]) - Whether feature values should be recomputed from the original data source. IfNone
, input feature values will be fetched from the Offline Store for Feature Views that have offline materialization enabled and otherwise computes feature values on the fly from raw data. Usefrom_source=True
to force computing from raw data andfrom_source=False
to error if any input Feature Views are not materialized. Defaults to None. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Default:None
Returns
TectonDataFrame
: A Tecton DataFrame with partially aggregated feature values and the _interval_start_time and
_interval_end_time columns for each partial aggregation.
get_timestamp_field()​
Returns the name of the timestamp field for this Feature View.Returns
str
list_jobs()​
Retrieves the list of all jobs (materialization and dataset generation) for this Feature View or Feature Table.Returns
List
[TectonJob
]: List of JobData
objects.
list_materialization_jobs()​
Retrieves the list of all materialization jobs for this feature view.Returns
List
[MaterializationJob
]: List of MaterializationJobData
objects.
materialization_status()​
Displays materialization information for the FeatureView, which may include past jobs, scheduled jobs, and job failures.Â
This method returns different information depending on the type of FeatureView.
Parameters
verbose
(bool
) - If set to true, method will display additional low level materialization information, useful for debugging. Default:false
limit
(int
) - Maximum number of jobs to return. Default:1000
sort_columns
(Optional
[str
]) - A comma-separated list of column names by which to sort the rows. Default:None
errors_only
(bool
) - If set to true, method will only return jobs that failed with an error. Default:false
print_transformation_schema()​
Prints the schema of the output of the transformation.Returns
None
run(...)​
This method has been replaced by the .run_transformation()
method and will be
deprecated in a future release.
Â
To run the feature view with data from raw data sources, the environment must have access to the data sources.
Parameters
start_time
(datetime.datetime
) - The start time of the time window to materialize.end_time
(datetime.datetime
) - The end time of the time window to materialize.aggregation_level
(Optional
[str
]) - For feature views with aggregations,aggregation_level
configures which stage of the aggregation to run up to. The query for Aggregate Feature Views operates in three steps:
Â- The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
 - The result of #1 is aggregated into tiles the size of the aggregation_interval.
 - The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.
Â
For testing and debugging purposes, to see the output of #1, useaggregation_level="disabled"
. For #2, useaggregation_level="partial"
. For #3, useaggregation_level="full"
.aggregation_level="full"
is the default behavior ifaggregation_level
is not explicitly specified.
None
- The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
mock_inputs
(Optional
[Dict
[str
,Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Default:None
mock_inputs_kwargs
(Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]) - Keyword arguments for mock sources that should be used instead of fetching directly from raw data sources. The keys should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
.
Returns
TectonDataFrame
: A tecton DataFrame of the results.
run_stream(...)​
Starts a streaming job to keep writing the output records of this FeatureView to a temporary table.Â
The job will be running until the execution is terminated.
Â
After records have been written to the table, they can be queried using
spark.sql()
.
If ran in a Databricks notebook, Databricks will also automatically visualize the number of incoming records.Parameters
output_temp_table
(str
) - The name of the temporary table to write to.checkpoint_dir
(Optional
[str
]) - A root directory that the streaming job will checkpoint to. Default:None
Returns
streaming.StreamingQuery
Example​
-
fv.run_stream(output_temp_table="temp_table")
Start a streaming job. -
display(spark.sql("SELECT \* FROM temp_table LIMIT 5"))
Query the output table, and display the output dataframe.
run_transformation(...)​
Run the FeatureView Transformation as is without any aggregations or joins. Supports transforming data directly from raw data sources or using mock data.Â
To run the feature view transformation with data from raw data sources, the environment must have access to the data sources.
Parameters
start_time
(datetime.datetime
) - The start time of the time window to materialize.end_time
(datetime.datetime
) - The end time of the time window to materialize.mock_inputs
(Optional
[Dict
[str
,Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]]]) - Mock sources that should be used instead of fetching directly from raw data sources. The keys of the dictionary should match the Feature View's function parameters. For Feature Views with multiple sources, mocking some data sources and using raw data for others is supported. Usingmock_inputs
is incompatible withfrom_source=False
. Default:None
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Default:None
Returns
TectonDataFrame
: A tecton DataFrame of the results.
Example​
import tecton
import pandas
from datetime import datetime
# Example running a non-aggregate Feature View with mock data.
feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_feature_view")
mock_fraud_user_data = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3"],
"timestamp": [datetime(2022, 5, 1, 0), datetime(2022, 5, 1, 2), datetime(2022, 5, 1, 5)],
"credit_card_number": [1000, 4000, 5000],
}
)
result = feature_view.run_transformation(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
mock_inputs={"fraud_users_batch": mock_fraud_user_data},
) # `fraud_users_batch` is the name of this Feature View's data source parameter.
# Example running an aggregate Feature View with real data.
aggregate_feature_view = tecton.get_workspace("my_workspace").get_feature_view("my_aggregate_feature_view")
result = aggregate_feature_view.run_transformation(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2))
summary()​
Displays a human-readable summary.test_run(...)​
Run the FeatureView using mock data sources. This requires a local spark session.Â
Unlike
run
, test_run
is intended for unit testing. It will not make calls to your
connected Tecton cluster or attempt to read real data.Parameters
start_time
(datetime.datetime
) - The start time of the time window to materialize.end_time
(datetime.datetime
) - The end time of the time window to materialize.aggregation_level
(Optional
[str
]) - For feature views with aggregations,aggregation_level
configures what stage of the aggregation to run up to. The query for Aggregate Feature Views operates in three logical steps:- The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
 - The result of #1 is aggregated into tiles the size of the aggregation_interval.
 - The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.
Â
For testing and debugging purposes, to see the output of #1, useaggregation_level="disabled"
. For #2, use
Â
aggregation_level="partial"
. For #3, useaggregation_level="full"
.
None
- The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
compute_mode
(Optional
[Union
[ComputeMode
,str
]]) - Compute mode to use to produce the data frame. Default:None
mock_inputs
(Union
[pandas.DataFrame
,pyspark_dataframe.DataFrame
]) - Keyword arguments with expected same keys as the FeatureView's inputs parameter. Each input name maps to a Spark DataFrame that should be evaluated for that node in the pipeline.
Returns
TectonDataFrame
: A tecton.TectonDataFrame
object.
transformation_schema()​
Returns the schema of the output of the transformation.Returns
List
[types.Field
]
trigger_materialization_job(...)​
Starts a batch materialization job for this Feature View.Parameters
start_time
(datetime.datetime
) - The job will materialize feature values between the start_time and end_time.end_time
(datetime.datetime
) - The job will materialize feature values between the start_time and end_time.online
(bool
) - Whether the job will materialize features to the online store. The Feature View must be configured with online=True in order to materialize features online.offline
(bool
) - Whether the job will materialize features to the offline store. The Feature View must be configured with offline=True in order to materialize features offline.use_tecton_managed_retries
(bool
) - If enabled, Tecton will automatically retry failed attempts. Disable to manage your own retry behavior. Default:true
overwrite
(bool
) - If enabled, you will be able to run materialization jobs for periods that previously have materialized data. This operation can be sensitive for feature views with existing materialized online data. For the offline store, all previously materialized data between the start time and end time will be dropped. For the online store, all previous data will remain, but may be overwritten by this job. Default:false
Returns
str
: ID string of the created materialization job.Raises
TectonValidationError
: If job params are not valid.
wait_for_materialization_job()​
Blocks until the specified job has been completed.Parameters
job_id
(str
) - ID string of the materialization job.timeout
(Optional
[datetime.timedelta
]) - (Optional) timeout for this function. An exception is raised if the job does not complete within the specified time. Default:None
Returns
MaterializationJob
: JobData
object for the successful job.Raises
JobTimeoutException
: If timeout param is specified and job does not complete within the specified time.MaterializationJobFailedException
: If materialization job did not reach a successful state.
with_join_key_map()​
Rebind join keys for a Feature View or Feature Table used in a Feature Service.Â
The keys in join_key_map should be the join keys, and the values should be the feature service overrides.
Parameters
join_key_map
(Dict
[str
,str
]) - Dictionary remapping the join key names. Dictionary keys are join keys, values are the feature service override values.
Returns
FeatureReference
Example
from tecton import FeatureService# The join key for this feature service will be "feature_service_user_id".feature_service = FeatureService(name="feature_service",features=[my_feature_view.with_join_key_map({"user_id" : "feature_service_user_id"}),],)# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",# "sender_features", and "recipient_features".transaction_fraud_service = FeatureService(name="transaction_fraud_service",features=[# Select a subset of features from a feature view.transaction_features[["amount"]],# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the# transaction sender and recipient, so include the feature view twice and bind it to two different feature# service join keys.user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),],)
with_name()​
Rename a Feature View or Feature Table used in a Feature Service.Parameters
namespace
(str
) - The namespace used to prefix the features joined from this FeatureView. By default, namespace is set to the FeatureView name.
Returns
FeatureReference
Examples
from tecton import FeatureService# The feature view in this feature service will be named "new_named_feature_view" in training data dataframe# columns and other metadata.feature_service = FeatureService(name="feature_service",features=[my_feature_view.with_name("new_named_feature_view")],)
# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",# "sender_features", and "recipient_features".transaction_fraud_service = FeatureService(name="transaction_fraud_service",features=[# Select a subset of features from a feature view.transaction_features[["amount"]],# Rename a feature view and/or rebind its join keys. In this example, we want user features for both the# transaction sender and recipient, so include the feature view twice and bind it to two different feature# service join keys.user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),],)
validate()​
Method is deprecated and will be removed in a future version. As of Tecton version 1.0, objects are validated upon object creation, so validation is unnecessary.Returns
None