Skip to main content
Version: 0.6

Debugging Queries

info

This page applies to Tecton on Spark, only.

Overview

Tecton generates queries when <feature service>.get_historical_features(), <feature view>.get_historical_features(), and <feature view>.run() are run. These functions return a TectonDataFrame object, which contains a query plan. You can call methods of this object to debug the query.

Debugging workflow

To debug a query plan, follow these general steps.

1. Show the query plan

Run <TectonDataFrame object>.explain() to show a query plan for a TectonDataFrame object. The output consists of a tree of nodes, where each node is a step. For example, suppose df is a TectonDataFrame object. Following is example output that is generated from a call to df.explain():

Example output:

<1> RenameColsNode
└── <2> RespectFeatureStartTimeNode
└── <3> AsofJoinFullAggNode(spine, partial_aggregates)
├── <4> [spine] AddRetrievalAnchorTimeNode
│ └── <5> UserSpecifiedDataNode
└── <6> [partial_aggregates] PartialAggNode
└── <7> EntityFilterNode(feature_data, entities)
├── <8> [feature_data] FeatureViewPipelineNode(transactions)
│ └── <9> [transactions] DataSourceScanNode
└── <10> [entities] SelectDistinctNode
└── <11> AddRetrievalAnchorTimeNode
└── <12> UserSpecifiedDataNode

Interpret this tree structure as follows:

  • Each node has a unique id. Node 1 is the root of the tree.
  • Each node has one or more inputs. For example, node 1 has one input (node 2), and node 3 has two inputs (nodes 4 and 6). During query execution, a node can only be executed after all its input nodes have been executed. Thus, query execution starts at the leaf nodes and ends with the root node.
  • Each node with more than one input will name its inputs to distinguish them. For example, the two inputs of node 3 are named spine and partial_aggregates; node 4 is the spine and node 6 is the partial_aggregates.

Showing the query plan with node descriptions

You can show the query plan, with a description of the action taken by each node, by setting description=True in the call to <TectonDataFrame object>.explain(). For example, suppose df is a TectonDataFrame object. Following is example output that is generated from a call to df.explain(description=True):

<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
└── <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
└── <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
├── <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
│ └── <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
└── <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
└── <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
├── <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts' with feature time limits [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
│ └── <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch' and apply time range filter [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
└── <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
└── <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
└── <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id

Showing the output schema columns for each node

You can show output schema columns for each node by setting columns=True in the call to <TectonDataFrame object>.explain(). For example, suppose df is a TectonDataFrame object. Following is example output that is generated from a call to df.explain(description=False, columns=True).

<1> RenameColsNode
└── <2> RespectFeatureStartTimeNode
└── <3> AsofJoinFullAggNode(spine, partial_aggregates)
├── <4> [spine] AddRetrievalAnchorTimeNode
│ └── <5> UserSpecifiedDataNode
└── <6> [partial_aggregates] PartialAggNode
└── <7> EntityFilterNode(feature_data, entities)
├── <8> [feature_data] FeatureViewPipelineNode(transactions)
│ └── <9> [transactions] DataSourceScanNode
└── <10> [entities] SelectDistinctNode
└── <11> AddRetrievalAnchorTimeNode
└── <12> UserSpecifiedDataNode

<1> timestamp|user_id|user_transaction_counts__transaction_count_1d_1d|user_transaction_counts__transaction_count_30d_1d|user_transaction_counts__transaction_count_90d_1d
<2> timestamp|user_id|_anchor_time|transaction_count_1d_1d|transaction_count_30d_1d|transaction_count_90d_1d
<3> timestamp|user_id|_anchor_time|transaction_count_1d_1d|transaction_count_30d_1d|transaction_count_90d_1d
<4> timestamp|user_id|_anchor_time
<5> timestamp|user_id
<6> user_id|count_transaction|_anchor_time
<7> user_id|transaction|timestamp
<8> user_id|transaction|timestamp
<9> user_id|transaction_id|category|amt|is_fraud|merchant|merch_lat|merch_long|timestamp
<10> user_id
<11> timestamp|user_id|_anchor_time
<12> timestamp|user_id

Node 1 has columns:

timestamp|user_id|user_transaction_counts__transaction_count_1d_1d|user_transaction_counts__transaction_count_30d_1d|user_transaction_counts__transaction_count_90d_1d

2. Step through the query plan

If the query plan does not indicate the cause of the bug, step through the execution of the query plan.

Stepping to a node

To step to a node in the query plan, call <TectonDataFrame object>.subtree(). The subtree() method extracts a node from the query plan in a TectonDataFrame object. The extracted node is a subtree of the query plan of the object that called subtree().

For example, suppose df is a TectonDataFrame object. The following code extracts node 5 of the query plan.

new_df = df.subtree(5)

Showing the stepped to node

Show the node of the TectonDataFrame that was extracted from the call to the subtree() method:

new_df.explain()

Output:

<1> UserSpecifiedDataNode

Executing the stepped to node

To execute the node, run the show() method:

new_df.to_spark().show(n=1)

One example row is returned because n=1:

timestampuser_id
2022-10-01 00:00:00user_268308151877

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon