Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add baseline execution stats to WindowAggExec and UnionExec, and fixup CoalescePartitionsExec #1018

Merged
merged 4 commits into from
Sep 18, 2021

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Sep 17, 2021

Which issue does this PR close?

Finally 🎉 closes #866 (following the same model as #1004).

There are still some operators like Parquet, CSV, Avro, and Json sources that are not instrumented, but I don't have time to devote to intrumenting them now, #1019 tracks that work

Rationale for this change

We want basic understanding of where a plan's time is spent and in what operators. See #866 for more details

What changes are included in this PR?

  1. Instrument WindowAggExec and UnionExec, using the API from Add BaselineMetrics, Timestamp metrics, add for CoalescePartitionsExec, rename output_time -> elapsed_compute #909
  2. Tweak instrumentation for CoalescePartitionsExec so it reports elapsed_compute
  3. Tests for same

Are there any user-facing changes?

More fields in EXPLAIN ANALYZE are now filled out

Example of how explain analyze is looking (dense but packed with good info). I find it quite cool that DataFusion can even plan and execute such queries.

running query: EXPLAIN ANALYZE SELECT count(*) as cnt FROM (SELECT count(*), c1 FROM aggregate_test_100 WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' GROUP BY c1 ORDER BY c1 ) UNION ALL SELECT 1 as cnt UNION ALL SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) LIMIT 2
Query Output:

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                            |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | GlobalLimitExec: limit=2, metrics=[output_rows=2, elapsed_compute=3.023µs]                                                                                                                                                                      |
|                   |   CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=50.216µs]                                                                                                                                                                     |
|                   |     LocalLimitExec: limit=2, metrics=[output_rows=3, elapsed_compute=699ns]                                                                                                                                                                     |
|                   |       UnionExec, metrics=[output_rows=3, elapsed_compute=198.269µs]                                                                                                                                                                             |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, fetch_time{inputPartition=0}=11.908387ms, send_time{inputPartition=0}=3.816µs]                                                   |
|                   |           GlobalLimitExec: limit=2, metrics=[output_rows=1, elapsed_compute=157ns]                                                                                                                                                              |
|                   |             ProjectionExec: expr=[COUNT(UInt8(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=4.125µs]                                                                                                                                   |
|                   |               HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=1, elapsed_compute=66.501µs]                                                                                                                  |
|                   |                 CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=11.969µs]                                                                                                                                                       |
|                   |                   HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=3, elapsed_compute=89.888µs]                                                                                                            |
|                   |                     RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[fetch_time{inputPartition=0}=11.049667ms, send_time{inputPartition=0}=3.965µs, repart_time{inputPartition=0}=NOT RECORDED]                                       |
|                   |                       SortExec: [c1@1 ASC], metrics=[output_rows=5, elapsed_compute=196.635µs]                                                                                                                                                  |
|                   |                         CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=11.127µs]                                                                                                                                               |
|                   |                           ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1)), c1@0 as c1], metrics=[output_rows=5, elapsed_compute=17.878µs]                                                                                            |
|                   |                             HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5, elapsed_compute=308.373µs]                                                                              |
|                   |                               CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=5, elapsed_compute=77.054µs]                                                                                                                    |
|                   |                                 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 3), metrics=[send_time{inputPartition=0}=NOT RECORDED, repart_time{inputPartition=0}=200.022µs, fetch_time{inputPartition=0}=28.377811ms] |
|                   |                                   HashAggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5, elapsed_compute=585.625µs]                                                                                 |
|                   |                                     CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=99, elapsed_compute=268.21µs]                                                                                                             |
|                   |                                       FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, metrics=[output_rows=99, elapsed_compute=228.181µs]                                                                                                  |
|                   |                                         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, fetch_time{inputPartition=0}=6.765885ms, send_time{inputPartition=0}=3.554µs]                    |
|                   |                                           CsvExec: source=Path(/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv: [/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv]), has_header=true, metrics=[]         |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[fetch_time{inputPartition=0}=131.806µs, send_time{inputPartition=0}=11.372µs, repart_time{inputPartition=0}=NOT RECORDED]                                                    |
|                   |           GlobalLimitExec: limit=2, metrics=[output_rows=1, elapsed_compute=211ns]                                                                                                                                                              |
|                   |             ProjectionExec: expr=[1 as cnt], metrics=[output_rows=1, elapsed_compute=60.248µs]                                                                                                                                                  |
|                   |               EmptyExec: produce_one_row=true, metrics=[]                                                                                                                                                                                       |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, fetch_time{inputPartition=0}=10.627386ms, send_time{inputPartition=0}=2.322µs]                                                   |
|                   |           GlobalLimitExec: limit=2, metrics=[output_rows=1, elapsed_compute=287ns]                                                                                                                                                              |
|                   |             CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=11.487µs]                                                                                                                                                           |
|                   |               ProjectionExec: expr=[LEAD(c1,Int64(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=5.57µs]                                                                                                                                |
|                   |                 RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, send_time{inputPartition=0}=4.463µs, fetch_time{inputPartition=0}=618.203µs]                                             |
|                   |                   WindowAggExec: wdw=[LEAD(c1,Int64(1)): Ok(Field { name: "LEAD(c1,Int64(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })], metrics=[output_rows=1, elapsed_compute=140.931µs]     |
|                   |                     ProjectionExec: expr=[1 as c1], metrics=[output_rows=1, elapsed_compute=9.968µs]                                                                                                                                            |
|                   |                       EmptyExec: produce_one_row=true, metrics=[]                                                                                                                                                                               |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Sep 17, 2021
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
&formatted,
"LocalLimitExec: limit=3",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by making the query more complicated, it has also introduced a LocalLimitExec for testing 🎉

// find partition to execute
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
return input.execute(partition).await;
let stream = input.execute(partition).await?;
drop(timer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this drop needed if the next line contains a return?

Copy link
Contributor Author

@alamb alamb Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drop is needed to satisify the borrow checker: let timer = baseline_metrics.elapsed_compute().timer(); has borrowed from baseline_metrics and rust won't allow baseline metrics to be given to ObserverdStream::new while borrowed.

An alternative is to clone the elapsed compute

        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
        let timer = elapsed_compute.timer();

Which is done in several other parts of this PR. 🤔

Perhaps that would be better to keep the code consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in c0b656c

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, looking forward to using those!

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2021

I think something unrelated to this PR is causing the tests to fail. I will look into it later today or tomorrow if no one else gets around to it

@Dandandan
Copy link
Contributor

I think something unrelated to this PR is causing the tests to fail. I will look into it later today or tomorrow if no one else gets around to it

Looks to me just the test in csv_explain_analyze which now contains metrics for the CoalescePartitionsExec which it didn't do before.

@Dandandan Dandandan merged commit 7d8a5cf into apache:master Sep 18, 2021
@Dandandan
Copy link
Contributor

Thanks 👍

@alamb alamb deleted the alamb/stats branch September 19, 2021 10:31
@houqp houqp added the enhancement New feature or request label Nov 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add "baseline" metrics to all built in operators
3 participants