Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move CBOs and Statistics to physical plan #965

Merged
merged 25 commits into from
Sep 13, 2021
Merged

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Sep 2, 2021

Which issue does this PR close?

Closes #962.

Rationale for this change

Cost based optimizations are executed during the logical plan optimization. This breaks the intent of separating the logical and physical plan. Usually, the logical plan is supposed to be optimized using rule based optimizations only. It is in the physical plan selection that the cost based optimization should kick in.

Additional context is on the mailing list and in a google design document

What changes are included in this PR?

  • move statistics() method from TableProvider to ExecutionPlan

Are there any user-facing changes?

Custom implems of the TableProvider and ExecutionPlan will not work the same.

Note: This PR is fairly big. It could be split, but to avoid a regression in terms of performance, that would mean having an intermediate state where the statistics are both on the logical plan and on the physical plan.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Sep 2, 2021
@rdettai rdettai marked this pull request as draft September 2, 2021 10:20
@rdettai
Copy link
Contributor Author

rdettai commented Sep 2, 2021

TODO:

  • move the statistics() function to the ExecutionPlan trait and provide naive implementations
  • re-implement the aggregate_statistics CBO
  • re-implement the hash_build_probe_order CBO
  • explain/analyze using default stat ok?
  • union using accumulator ok?
    • made scalar min and max public instead
  • test: hash_build_probe_order integration (sql.rs)
  • test: hash_build_probe_order unit test
  • test: union stats
  • test: stat for record batch helper
  • test: projection stats
  • test: cross join stats
  • test: limit stats
  • test: window agg stat
  • test: ballista merge partition stats (shuffle_reader)
  • should we really use async?
    • decided to make it sync and consider the plan as immutable

@xudong963
Copy link
Member

Can I participant in code review?

@houqp
Copy link
Member

houqp commented Sep 4, 2021

@xudong963 yes, anyone from the community is welcome to participate in code reviews :)

@rdettai rdettai force-pushed the move-cbo branch 2 times, most recently from 5c95e0d to 82756c9 Compare September 8, 2021 13:50
@rdettai rdettai marked this pull request as ready for review September 9, 2021 16:40
@rdettai
Copy link
Contributor Author

rdettai commented Sep 9, 2021

@Dandandan @houqp @andygrove this PR is finally ready for review. It is a huge chunk, but I think it is worth it.

@alamb
Copy link
Contributor

alamb commented Sep 9, 2021

FWIW I think running a cost based optimizer on the physical plan is a good idea. I plan to review this PR carefully tomorrow

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @rdettai , this is really great work. Looks good to me overall 👍

It looks like the only change for external TableProvider implementation like DeltaTable is to move code in the statistics method into the the scan method and pass the calculated stats down to the ParquetExec struct.

@@ -390,6 +387,10 @@ impl ExecutionPlan for ParquetExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
self.statistics.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to avoid this clone here? Should we return &Statistics instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this specific implementation yes, but this is a trait method, and other implementations do create new instances of Statistics on the fly, which aren't bound to &self's lifetime. To make this trait method return a reference we need either:

  • all ExecutionPlan instances to be immutable and always compute the statistics upon instantiation (also related to the comment from @houqp )
  • to make the method &mut self allowing it to bind newly constructed objects to self's lifetime (in common people's terms: allow caching 😉)

I have a personal preference for the first solution. I would open a separate PR to address it as it would require to fully review the immutability of all ExecutionPlan implems

As a side note, this would also apply for the other trait methods:

  • schema() should return &SchemaRef,
  • children() should return &[Arc] or &[&Arc]
  • output_partitioning() should return &Partitioning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #987 to track this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like the first option proposed as well

@alamb alamb changed the title Move CBOs to physical plan Move CBOs and Statistics to physical plan Sep 10, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this @rdettai -- I started going through this PR and I like what I see so far. As it is a fairly substantial change (understatement) I need some more time to complete my review but will try and devote some more time to it tomorrow.

I also shot a note to the mailing list and slack for some additional eyes: https://lists.apache.org/thread.html/r8f777f9840d660086d1d933d2962559731c7e45ae23ca090456b444d%40%3Cdev.arrow.apache.org%3E

@andygrove
Copy link
Member

I am happy to see CBO moving to the physical plan. I don't have time to do a meaningful review on this PR unfortunately but I would be happy to help with testing/debugging, especially with Ballista.

@houqp houqp requested a review from Dandandan September 10, 2021 19:34
@houqp
Copy link
Member

houqp commented Sep 10, 2021

@rdettai there are some merge conflicts that need to be resolved.

@alamb alamb added the api change Changes the API exposed to users of the crate label Sep 11, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through most of this PR carefully -- great work @rdettai . I still need to review the optimizer passes for changes but I hope to finish that later this weekend.

(@rdettai in the future, especially for large PRs like this one, it would help me review faster if you could highlight places where there are logical changes, places where there is new logic, places that are just refactoring / moving code around. Breaking into several PRs also would help but I understand there are tradeoffs required for that)

FYI @yjshen (as part of your table provider rework was related to consolidating statistics generation)

datafusion/src/datasource/datasource.rs Show resolved Hide resolved
datafusion/src/physical_plan/mod.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/mod.rs Show resolved Hide resolved
datafusion/src/execution/context.rs Outdated Show resolved Hide resolved
//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;

use arrow::datatypes::Schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file seems to have substantial changes from eef5e2d src/optimizer/aggregate_statistics.rs -- it is not just a rename. I am mostly noting this for other reviewers and my own notes to review it more carefully

Copy link
Contributor Author

@rdettai rdettai Sep 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. @alamb @houqp @Dandandan @andygrove... this part is really the gist of this PR, because this is where we can really judge whether it is a good idea or not to move CBOs to the physical plan. If you compare this optimization with the one originally performed on the logical plan, you see that it becomes a bit more complex because the physical plan can take many more shapes (vtable instead of enum) and is usually larger than the logical one (one logical aggregation node represented by multiple physical ones).

Copy link
Member

@houqp houqp Sep 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is indeed more complex than the logical optimizer. I feel like moving CBO into physical plane is the right move from a design's point of view. The vtable is hard to avoid because we want to make the physical plane extensible by 3rd party code.

So perhaps what we can do as a follow up of this PR is to come up with better helper function/macro to make it more readable and maintainable. @Dandandan 's #441 and @alamb 's ExprRewriter are good examples of such constructs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over the logic in this module carefully and I like it and agree with @houqp

@@ -0,0 +1,291 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file also has substantial changes from eef5e2d src/optimizer/hash_build_probe_order.rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also went through the logic in here and it looks good to me

datafusion/src/physical_plan/analyze.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/common.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have only 4 more (large) files to complete reviewing. Will not have time until tomorrow. Nice work @rdettai

}

/// [left/right]_col_count are required in case the column statistics are None
fn stats_cartesian_product(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the code to calculate cross join cardinality new? If so, perhaps that could be pulled into its own PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it isn't because we already had this in the previous version of the HashBuildProbeOrder optimizer.

datafusion/src/physical_plan/empty.rs Show resolved Hide resolved
datafusion/src/physical_plan/explain.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/filter.rs Show resolved Hide resolved
datafusion/src/physical_plan/hash_join.rs Show resolved Hide resolved
datafusion/src/physical_plan/limit.rs Show resolved Hide resolved
datafusion/src/physical_plan/mod.rs Outdated Show resolved Hide resolved
@@ -213,6 +234,30 @@ impl ExecutionPlan for LocalLimitExec {
}
}
}

fn statistics(&self) -> Statistics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a duplicate of GlobalLimitExec::statistics -- could it be refactored to avoid the duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a "funny" case because the code is the same but not for the same reasons. In particular, in the LocalLimitExec there is something very unconfortable: If we know that the number of rows is greater than the limit, we know that the limit operator might kick in on one of the partitions and thus change the number of rows in the output. But the total number of rows might still be greater than the limit as this is only applied locally at the partition level. The right thing to do in that case would be to say that the output statistics is inexact. But if we do so, we loose too much information, and the GlobalLimitExec that comes right after it will not be able to conclude what it should: that the total number of row is exceeding the limit and thus its output will be exactly the limit value.

This is one case where having an interval kind of inexact statistics would be very valuable (see #965 (comment)).

datafusion/src/physical_plan/union.rs Show resolved Hide resolved
datafusion/tests/provider_filter_pushdown.rs Show resolved Hide resolved
@yjshen
Copy link
Member

yjshen commented Sep 12, 2021

I give a pass to the google doc first and plan to review this PR later today.

datafusion/src/physical_optimizer/utils.rs Show resolved Hide resolved
}
/// This table statistics are estimates about column
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ColumnStatistics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be beneficial to compute column statistics through a separate command, store it in a catalog or meta store, and load column stats from the storage lazily when we want a more accurate CBO result. Such as histogram @alamb have mentioned earlier.

Otherwise, I think the current approach in this PR that only considers Statistics is enough to get low-hanging fruits for CBO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- I think the approach in this PR is a good step forward and we can make it more sophisticated over time.

FWIW I think in IOx (where we have a custom TableProvider and ExecutionPlan) we do what @yjshen is suggesting and we store statistics in our catalog, and translate them to the DataFusion format Statistics when required (no data scanning required)

https://github.com/influxdata/influxdb_iox/blob/f42f0349ed435c431c5e60855eb8d95fd6e6b646/query/src/provider.rs#L270

Copy link
Contributor Author

@rdettai rdettai Sep 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption is that the statistics will depend on the query, that they should not be global to the dataset. The table format is usually able to get the statistics at a partition (hive) or file level (iceberg), and it is much more interesting for us to do the optimizations using statistics that come after the partition pruning. They will be much more accurate in the context of a given query than global ones.

@alamb
Copy link
Contributor

alamb commented Sep 12, 2021

I expect to complete review on this PR tomorrow

Modifications according to comments from @alamb
@rdettai
Copy link
Contributor Author

rdettai commented Sep 12, 2021

Thanks a lot @alamb for this very thorough review.

There is one recurring remark throughout the PR regarding the estimations of statistics in the case where statistics are not exact. I completely agree with all of these, most importantly:

  • per field definition of exact/inexact would be more flexible
  • we could specify much more accurately what inexact means (histograms, % of error, value interval...), and thus propagate some very interesting information

Obviously both will be topics for further work and more importantly, it would require a very thorough analysis to avoid bloating the codebase with very complex arithmetic that is actually never used by any optimization rule (or rules that never kick in). One example for this currently is the total_byte_size that I have ported from the previous Statistics abstraction, but that is often tricky to compute and that is currently never used.

@rdettai
Copy link
Contributor Author

rdettai commented Sep 13, 2021

I created a task list to gather possible improvements on statistics:

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is good to merge. Nice work @rdettai 👍

I am also going to try and update IOx to use this API as well and will report my findings back here

assert_eq!(table.statistics().num_rows, Some(8));
assert_eq!(table.statistics().total_byte_size, Some(671));
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

} else if let Some((max, name)) = take_optimizable_max(&**expr, &stats) {
projections.push((expressions::lit(max), name.to_owned()));
} else {
// TODO: we need all aggr_expr to be resolved (cf TODO fullres)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the notation cf TODO fullres

}
}

// TODO fullres: use statistics even if not all aggr_expr could be resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a cool possible follow on project that would work well for new contributors -- I would be happy to file a ticket if you agree.

//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;

use arrow::datatypes::Schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over the logic in this module carefully and I like it and agree with @houqp


assert_eq!(swapped_join.left().statistics().num_rows, Some(10));
assert_eq!(swapped_join.right().statistics().num_rows, Some(100000));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be prudent to verify the join type has been reversed from Left to Right as well in this test

@@ -0,0 +1,291 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also went through the logic in here and it looks good to me

let mut ctx = create_join_context_unbalanced("t1_id", "t2_id")?;
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ANother good test would be to select columns in a different order than t1 and then t2 -- for example:

        "SELECT t2_name, t1_name, t1_id  FROM t1 LEFT JOIN t2 ON t2_id = t1_id ORDER BY t1_id",

@@ -0,0 +1,284 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outstanding work @rdettai :)

@alamb
Copy link
Contributor

alamb commented Sep 13, 2021

I am also going to try and update IOx to use this API as well and will report my findings back here

Update: All the IOx tests pass with this PR, so I feel even better about it https://github.com/influxdata/influxdb_iox/pull/2519. 🚀

@alamb
Copy link
Contributor

alamb commented Sep 13, 2021

@yjshen or @Dandandan any thoughts prior to merging this PR?

Note this may conflict with the AVRO table provider in #910 by @Igosuki

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @rdettai for this work!

@alamb
Copy link
Contributor

alamb commented Sep 13, 2021

merging this one in so it doesn't stay outstanding any longer

@alamb alamb merged commit 2950334 into apache:master Sep 13, 2021
@Igosuki
Copy link
Contributor

Igosuki commented Sep 14, 2021

Rebased my branch after this, no issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Moving cost based optimizations to physical planning
8 participants