tecton.declarative.feature_view.MaterializedFeatureView

class tecton.declarative.feature_view.MaterializedFeatureView(name, pipeline_function, sources, entities, online, offline, offline_store, online_store, aggregation_interval, aggregations, ttl, feature_start_time, batch_schedule, online_serving_index, batch_compute, stream_compute, monitor_freshness, expected_feature_freshness, alert_email, description, owner, tags, inferred_transform, feature_view_type, timestamp_field, data_source_type, user_function, incremental_backfills, aggregation_mode=None, max_batch_aggregation_interval=None, output_stream=None)

Stream/Batch Feature View class to include in Feature Services or to use in unit testing.

Do not instantiate this class directly. Use a decorator-based constructor instead:

Methods

__init__

Do not directly use this constructor. Internal constructor for materialized FeatureViews.

run

Run the FeatureView using mock data sources.

with_join_key_map

Used to rebind join keys for a Feature View used in a Feature Service.

with_name

Used to rename a Feature View used in a Feature Service.

__init__(name, pipeline_function, sources, entities, online, offline, offline_store, online_store, aggregation_interval, aggregations, ttl, feature_start_time, batch_schedule, online_serving_index, batch_compute, stream_compute, monitor_freshness, expected_feature_freshness, alert_email, description, owner, tags, inferred_transform, feature_view_type, timestamp_field, data_source_type, user_function, incremental_backfills, aggregation_mode=None, max_batch_aggregation_interval=None, output_stream=None)

Do not directly use this constructor. Internal constructor for materialized FeatureViews.

run(spark, start_time, end_time, aggregation_level=None, **mock_sources)

Run the FeatureView using mock data sources. This requires a local spark session.

Parameters
  • start_time (Optional[datetime]) – The start time of the time window to materialize. If not set, defaults to end_time minus batch_schedule.

  • end_time (Optional[datetime]) – The end time of the time window to materialize. If not set, defaults to start_time plus batch_schedule

  • aggregation_level (Optional[str]) –

    For feature views with aggregations, aggregation_level configures what stage of the aggregation to run up to.

    The query for Aggregate Feature Views operates in three logical steps:

    1. The feature view query is run over the provided time range. The user defined transformations are applied over the data source.

    2. The result of #1 is aggregated into tiles the size of the aggregation_interval.

    3. The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the time_window of the aggregation.

    For testing and debugging purposes, to see the output of #1, use aggregation_level="disabled". For #2, use aggregation_level="partial". For #3, use aggregation_level="full".

    aggregation_level="full" is the default behavior.

  • **mock_sources – kwargs with expected same keys as the FeatureView’s inputs parameter. Each input name maps to a Spark DataFrame that should be evaluated for that node in the pipeline.

Example:

from datetime import datetime, timedelta
import pandas
from fraud.features.batch_features.user_credit_card_issuer import user_credit_card_issuer


# The `tecton_pytest_spark_session` is a PyTest fixture that provides a
# Tecton-defined PySpark session for testing Spark transformations and feature
# views.
def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_session):
    input_pandas_df = pandas.DataFrame({
        "user_id": ["user_1", "user_2", "user_3", "user_4"],
        "signup_timestamp": [datetime(2022, 5, 1)] * 4,
        "cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
    })
    input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)

    # Simulate materializing features for May 1st.
    output = user_credit_card_issuer.run(
        spark=tecton_pytest_spark_session,
        start_time=datetime(2022, 5, 1),
        end_time=datetime(2022, 5, 2),
        fraud_users_batch=input_spark_df)

    actual = output.toPandas()

    expected = pandas.DataFrame({
        "user_id": ["user_1", "user_2", "user_3", "user_4"],
        "signup_timestamp":  [datetime(2022, 5, 1)] * 4,
        "credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"],
    })

    pandas.testing.assert_frame_equal(actual, expected)
Returns

A tecton.DataFrame object.

with_join_key_map(join_key_map)

Used to rebind join keys for a Feature View used in a Feature Service. The keys in join_key_map should be the feature view join keys, and the values should be the feature service overrides.

from tecton import FeatureService

# The join key for this feature service will be "feature_service_user_id".
feature_service = FeatureService(
    name="feature_service",
    features=[
        my_feature_view.with_join_key_map({"user_id" : "feature_service_user_id"}),
    ],
)

# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
    name="transaction_fraud_service",
    features=[
        # Select a subset of features from a feature view.
        transaction_features[["amount"]],

        # Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
        # transaction sender and recipient, so include the feature view twice and bind it to two different feature
        # service join keys.
        user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),
        user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),
    ],
)
with_name(namespace)

Used to rename a Feature View used in a Feature Service.

from tecton import FeatureService

# The feature view in this feature service will be named "new_named_feature_view" in training data dataframe
# columns and other metadata.
feature_service = FeatureService(
    name="feature_service",
    features=[
        my_feature_view.with_name("new_named_feature_view")
    ],
)

# Here is a more sophisticated example. The join keys for this feature service will be "transaction_id",
# "sender_id", and "recipient_id" and will contain three feature views named "transaction_features",
# "sender_features", and "recipient_features".
transaction_fraud_service = FeatureService(
    name="transaction_fraud_service",
    features=[
        # Select a subset of features from a feature view.
        transaction_features[["amount"]],

        # Rename a feature view and/or rebind its join keys. In this example, we want user features for both the
        # transaction sender and recipient, so include the feature view twice and bind it to two different feature
        # service join keys.
        user_features.with_name("sender_features").with_join_key_map({"user_id" : "sender_id"}),
        user_features.with_name("recipient_features").with_join_key_map({"user_id" : "recipient_id"}),
    ],
)

Attributes

name

Name of this Tecton Object.