Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for EXPLAIN ANALYZE #858

Merged
merged 2 commits into from
Aug 12, 2021
Merged

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Aug 11, 2021

Which issue does this PR close?

Resolves #779

Rationale for this change

EXPLAIN PLAN is great to understand what DataFusion plans to do, but it is hard today using the SQL or dataframe interface to understand in more depth what actually happened during execution.

My real usecase is being able to see how many rows flowed through each operator as well as the "time spent" and "rows produced" by each operator, and this PR is a step in that direction.

What changes are included in this PR?

  1. Add basic plan nodes for EXPLAIN ANALYZE and EXPLAIN ANALYZE VERBOSE` (example below) sql
  2. Refactor special case ParquetStrream into RecordBatchReceiverStream for reuse

Are there any user-facing changes?

Yes, EXPLAIN ANALYZE now does something different than EXPLAIN

Example of use

echo "1,A" > /tmp/foo.csv
echo "1,B" >> /tmp/foo.csv
echo "2,A" >> /tmp/foo.csv

Run CLI

cargo run --bin datafusion-cli
CREATE EXTERNAL TABLE foo(x INT, b VARCHAR) STORED AS CSV LOCATION '/tmp/foo.csv';

Example EXPLAIN ANALYZE output

> EXPLAIN ANALYZE SELECT SUM(x) FROM foo GROUP BY b;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                      |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec, metrics=[]                                                                                                                        |
|                   |   ProjectionExec: expr=[SUM(foo.x)@1 as SUM(x)], metrics=[]                                                                                               |
|                   |     HashAggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[SUM(x)], metrics=[outputRows=2]                                                       |
|                   |       CoalesceBatchesExec: target_batch_size=4096, metrics=[]                                                                                             |
|                   |         RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=839560, fetchTime=122528525, repartitionTime=5327877] |
|                   |           HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2]                                                          |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[fetchTime=5660489, repartitionTime=0, sendTime=8012]                              |
|                   |               CsvExec: source=Path(/tmp/foo.csv: [/tmp/foo.csv]), has_header=false, metrics=[]                                                            |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.012 seconds.

Example EXPLAIN ANALYZE VERBOSE output

> EXPLAIN ANALYZE VERBOSE SELECT SUM(x) FROM foo GROUP BY b;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                      |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec, metrics=[]                                                                                                                        |
|                   |   ProjectionExec: expr=[SUM(foo.x)@1 as SUM(x)], metrics=[]                                                                                               |
|                   |     HashAggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[SUM(x)], metrics=[outputRows=2]                                                       |
|                   |       CoalesceBatchesExec: target_batch_size=4096, metrics=[]                                                                                             |
|                   |         RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[repartitionTime=6584110, fetchTime=132927514, sendTime=904001] |
|                   |           HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2]                                                          |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[repartitionTime=0, sendTime=8246, fetchTime=6239096]                              |
|                   |               CsvExec: source=Path(/tmp/foo.csv: [/tmp/foo.csv]), has_header=false, metrics=[]                                                            |
| Output Rows       | 2                                                                                                                                                         |
| Duration          | 10.283764ms                                                                                                                                               |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set. Query took 0.014 seconds.

Future work:

Note this PR is designed just to hook up / plumb the existing code and metrics we have into SQL (basically what got added in #662). I plan a sequence of follow on PRs to both improve the metrics infrastructure #679 and add/fix the metrics that are actually reported so they are consistent. The specific metrics that are displayed are verbose and somewhat ad hoc at the moment.

@alamb alamb added the api change Changes the API exposed to users of the crate label Aug 11, 2021
@github-actions github-actions bot added datafusion Changes in the datafusion crate sql SQL Planner labels Aug 11, 2021
@@ -213,6 +213,16 @@ pub enum LogicalPlan {
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
Analyze {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I chose a new LogicalPlan node because the implementation for ANALYZE is so different than EXPLAIN. However, it would be possible to re-use the same LogicalPlan node if people prefer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am about to ask when I saw the above code that has analyze as different function. I am worry about future inconsistency and headache of keeping them consistent, as well as redundant work when we change or improve something. I would prefer to keep them in the same LogicalPlan

@alamb alamb force-pushed the alamn/analyze_explain branch from 97b5928 to c7d1d4a Compare August 11, 2021 19:28
@alamb alamb force-pushed the alamn/analyze_explain branch from c7d1d4a to 3ff7ffd Compare August 11, 2021 19:30
@alamb alamb marked this pull request as ready for review August 11, 2021 19:30
Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the fact you built this infrastructure before adding new coulters. It is easy to understand and review. My only concern is the analyze is implemented in different path of the explain which seems also in different path of the actual plan. I am not sure how tricky to keep them in the same path but if we can, it will help us keep the results consistent, avoid redundancy and headache for future work.

.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

roundtrip_test!(plan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we talk I need to learn the the effectiveness of this round trip test thats convert a logical/physical plan into photo and back. The tests look simple and easy to understand this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I simply copy/pasted the test for roundtrip_explain below -- I agree the pattern is quite nice

@@ -213,6 +213,16 @@ pub enum LogicalPlan {
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
Analyze {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am about to ask when I saw the above code that has analyze as different function. I am worry about future inconsistency and headache of keeping them consistent, as well as redundant work when we change or improve something. I would prefer to keep them in the same LogicalPlan

@alamb
Copy link
Contributor Author

alamb commented Aug 12, 2021

I am about to ask when I saw the above code that has analyze as different function. I am worry about future inconsistency and headache of keeping them consistent, as well as redundant work when we change or improve something. I would prefer to keep them in the same LogicalPlan

@NGA-TRAN I also went back and forth on this point. The existing LogicalPlan::Explain is special cased several times during planning (so it can capture the results of intermediate passes as strings), and since those intermediate strings aren't used by Analyze we would then have to do an extra check in each special case

And thus even though there is definitely some redundancy, I eventually concluded that a new LogicalPlan type made things most clear.

The physical plan (ExecutionPlan) for Analyze is also very different but it would be feasible to use the different physical plans for the same logical plan.

@jorgecarleitao
Copy link
Member

I agree that a new variant makes sense here, for the reasons @alamb enumerated.

Also, pretty awesome PR! 💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement EXPLAIN ANALYZE
4 participants