Debugging Example: Slow Execution of get_features_for_events()
This page shows how to debug a query that is generated by
get_features_for_events()
. The query is much slower than expected. The
following example uses a Feature View declared for Tecton on Spark, but the
debugging process applies similarly to Tecton on Snowflake and Athena Retrieval.
The setup code below is needed to run the code that follows the setup code.
from tecton import FileConfig, BatchSource, Entity, batch_feature_view, Aggregation, TimeWindow
from tecton import Field, Int64, String, Timestamp
from datetime import datetime, timedelta
batch_config = FileConfig(
uri="s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
)
transactions_batch = BatchSource(name="transactions_batch", batch_config=batch_config)
user = Entity(name="fraud_user", join_keys=["user_id"])
@batch_feature_view(
sources=[transactions_batch],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 30),
schema=[Field("user_id", String), Field("transaction", Int64), Field("timestamp", Timestamp)],
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]
The call to get_features_for_events()
that is returning much slower than expected​
Build an events dataframe and then call get_features_for_events()
with the
events dataframe:
import tecton
import pandas as pd
from datetime import datetime
tecton.set_validation_mode("auto")
events = pd.DataFrame.from_dict(
{
"timestamp": [datetime(2022, 10, 1, 0, 0, 0)],
"user_id": ["user_268308151877"],
}
)
df = user_transaction_counts.get_features_for_events(events=events)
The debugging process​
display(df)
user_id | timestamp | user_transaction_counts__transaction_count_1d_1d | user_transaction_counts__transaction_count_30d_1d | user_transaction_counts__transaction_count_90d_1d |
---|---|---|---|---|
user_268308151877 | 2022-10-01 00:00:00 | 0 | 13 | 36 |
The result is expected, but the query took much longer than expected. To debug this result, show the query plan:
df.explain()
<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts_no_filter__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts_no_filter__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts_no_filter__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
└── <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
└── <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
├── <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts_no_filter' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
│ └── <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
└── <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
└── <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
├── <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts_no_filter' with feature time limits [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
│ └── <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch'. WARNING: there is no time range filter so all rows will be returned. This can be very inefficient.
└── <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
└── <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts_no_filter' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
└── <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
Node 9 reveals a potential issue with the DataSourceScanNode
. It is scanning
the full data source transactions_batch
, without any time range filter. This
is an expensive operation.
Hypothesis: The root cause is that the user_transaction_counts
feature
view is not using a
FilteredSource
.
Correcting the feature view to fix the issue​
You can fix the original feature view by using a FilteredSource
:
from tecton import FilteredSource
@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 30),
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
You can then test the original query again:
import tecton
import pandas as pd
from datetime import datetime
tecton.set_validation_mode("auto")
events = pd.DataFrame.from_dict(
{
"timestamp": [datetime(2022, 10, 1, 0, 0, 0)],
"user_id": ["user_268308151877"],
}
)
df = user_transaction_counts.get_features_for_events(events=events)
display(df.to_pandas())
user_id | timestamp | user_transaction_counts__transaction_count_1d_1d | user_transaction_counts__transaction_count_30d_1d | user_transaction_counts__transaction_count_90d_1d |
---|---|---|---|---|
user_268308151877 | 2022-10-01 00:00:00 | 0 | 13 | 36 |
This time, get_features_for_events()
should return much faster. You can also
confirm that this is because it is correctly filtering the data source:
<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
└── <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
└── <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
├── <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
│ └── <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
└── <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
└── <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
├── <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts' with feature time limits [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
│ └── <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch' and apply time range filter [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
└── <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
└── <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
└── <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id