Debugging Queries
Overviewโ
Tecton generates queries when get_features_for_events(),
get_features_in_range(), get_partial_aggregates() and run_transformation()
are run. These functions return a TectonDataFrame object, which contains a
query plan. You can call methods of this object to debug the query.
Debugging workflowโ
To debug a query plan, follow these general steps.
1. Show the query planโ
Run <TectonDataFrame object>.explain() to show a query plan for a
TectonDataFrame object. The output consists of a tree of nodes, where each
node is a step. For example, suppose df is a TectonDataFrame object.
Following is example output that is generated from a call to df.explain():
Example output:
<1> RenameColsNode
โโโ <2> RespectFeatureStartTimeNode
โโโ <3> AsofJoinFullAggNode(spine, partial_aggregates)
โโโ <4> [spine] AddRetrievalAnchorTimeNode
โ โโโ <5> UserSpecifiedDataNode
โโโ <6> [partial_aggregates] PartialAggNode
โโโ <7> EntityFilterNode(feature_data, entities)
โโโ <8> [feature_data] FeatureViewPipelineNode(transactions)
โ โโโ <9> [transactions] DataSourceScanNode
โโโ <10> [entities] SelectDistinctNode
โโโ <11> AddRetrievalAnchorTimeNode
โโโ <12> UserSpecifiedDataNode
Interpret this tree structure as follows:
- Each node has a unique id. Node 1 is the root of the tree.
- Each node has one or more inputs. For example, node 1 has one input (node 2), and node 3 has two inputs (nodes 4 and 6). During query execution, a node can only be executed after all its input nodes have been executed. Thus, query execution starts at the leaf nodes and ends with the root node.
- Each node with more than one input will name its inputs to distinguish them.
For example, the two inputs of node 3 are named
spineandpartial_aggregates; node 4 is thespineand node 6 is thepartial_aggregates.
Showing the query plan with node descriptionsโ
You can show the query plan, with a description of the action taken by each
node, by setting description=True in the call to
<TectonDataFrame object>.explain(). For example, suppose df is a
TectonDataFrame object. Following is example output that is generated from a
call to df.explain(description=True):
<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
โโโ <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
โโโ <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
โโโ <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
โ โโโ <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
โโโ <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
โโโ <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
โโโ <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts' with feature time limits [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
โ โโโ <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch' and apply time range filter [2022-07-03T00:00:00+00:00, 2022-10-02T00:00:00+00:00)
โโโ <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
โโโ <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
โโโ <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
Showing the output schema columns for each nodeโ
You can show output schema columns for each node by setting columns=True in
the call to <TectonDataFrame object>.explain(). For example, suppose df is a
TectonDataFrame object. Following is example output that is generated from a
call to df.explain(description=False, columns=True).
<1> RenameColsNode
โโโ <2> RespectFeatureStartTimeNode
โโโ <3> AsofJoinFullAggNode(spine, partial_aggregates)
โโโ <4> [spine] AddRetrievalAnchorTimeNode
โ โโโ <5> UserSpecifiedDataNode
โโโ <6> [partial_aggregates] PartialAggNode
โโโ <7> EntityFilterNode(feature_data, entities)
โโโ <8> [feature_data] FeatureViewPipelineNode(transactions)
โ โโโ <9> [transactions] DataSourceScanNode
โโโ <10> [entities] SelectDistinctNode
โโโ <11> AddRetrievalAnchorTimeNode
โโโ <12> UserSpecifiedDataNode
<1> timestamp|user_id|user_transaction_counts__transaction_count_1d_1d|user_transaction_counts__transaction_count_30d_1d|user_transaction_counts__transaction_count_90d_1d
<2> timestamp|user_id|_anchor_time|transaction_count_1d_1d|transaction_count_30d_1d|transaction_count_90d_1d
<3> timestamp|user_id|_anchor_time|transaction_count_1d_1d|transaction_count_30d_1d|transaction_count_90d_1d
<4> timestamp|user_id|_anchor_time
<5> timestamp|user_id
<6> user_id|count_transaction|_anchor_time
<7> user_id|transaction|timestamp
<8> user_id|transaction|timestamp
<9> user_id|transaction_id|category|amt|is_fraud|merchant|merch_lat|merch_long|timestamp
<10> user_id
<11> timestamp|user_id|_anchor_time
<12> timestamp|user_id
Node 1 has columns:
timestamp|user_id|user_transaction_counts__transaction_count_1d_1d|user_transaction_counts__transaction_count_30d_1d|user_transaction_counts__transaction_count_90d_1d
2. Step through the query planโ
If the query plan does not indicate the cause of the bug, step through the execution of the query plan.
Stepping to a nodeโ
To step to a node in the query plan, call <TectonDataFrame object>.subtree().
The subtree() method extracts a node from the query plan in a
TectonDataFrame object. The extracted node is a subtree of the query plan of
the object that called subtree().
For example, suppose df is a TectonDataFrame object. The following code
extracts node 5 of the query plan.
new_df = df.subtree(5)
Showing the stepped to nodeโ
Show the node of the TectonDataFrame that was extracted from the call to the
subtree() method:
new_df.explain()
Output:
<1> UserSpecifiedDataNode
Executing the stepped to nodeโ
To execute the node, run the show() method:
new_df.to_spark().show(n=1)
One example row is returned because n=1:
| timestamp | user_id |
|---|---|
2022-10-01 00:00:00 | user_1 |