-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move Datafusion Query Optimizer to library user guide #11563
Merged
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
8d2204a
Added Datafusion Query Optimizer to user guide
devesh-2002 3c0fe17
Updated Query optimizer name, Added to index and replaced the README …
devesh-2002 129bd51
Fix RAT check
alamb 29039fe
Merge remote-tracking branch 'apache/main' into devesh
alamb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
336 changes: 336 additions & 0 deletions
336
docs/source/library-user-guide/datafusion-query-optimizer.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,336 @@ | ||
<!--- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
|
||
# DataFusion Query Optimizer | ||
|
||
[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory | ||
format. | ||
|
||
DataFusion has modular design, allowing individual crates to be re-used in other projects. | ||
|
||
This crate is a submodule of DataFusion that provides a query optimizer for logical plans, and | ||
contains an extensive set of OptimizerRules that may rewrite the plan and/or its expressions so | ||
they execute more quickly while still computing the same result. | ||
|
||
## Running the Optimizer | ||
|
||
The following code demonstrates the basic flow of creating the optimizer with a default set of optimization rules | ||
and applying it to a logical plan to produce an optimized logical plan. | ||
|
||
```rust | ||
|
||
// We need a logical plan as the starting point. There are many ways to build a logical plan: | ||
// | ||
// The `datafusion-expr` crate provides a LogicalPlanBuilder | ||
// The `datafusion-sql` crate provides a SQL query planner that can create a LogicalPlan from SQL | ||
// The `datafusion` crate provides a DataFrame API that can create a LogicalPlan | ||
let logical_plan = ... | ||
|
||
let mut config = OptimizerContext::default(); | ||
let optimizer = Optimizer::new(&config); | ||
let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; | ||
|
||
fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { | ||
println!( | ||
"After applying rule '{}':\n{}", | ||
rule.name(), | ||
plan.display_indent() | ||
) | ||
} | ||
``` | ||
|
||
## Providing Custom Rules | ||
|
||
The optimizer can be created with a custom set of rules. | ||
|
||
```rust | ||
let optimizer = Optimizer::with_rules(vec![ | ||
Arc::new(MyRule {}) | ||
]); | ||
``` | ||
|
||
## Writing Optimization Rules | ||
|
||
Please refer to the | ||
[optimizer_rule.rs](../../datafusion-examples/examples/optimizer_rule.rs) | ||
example to learn more about the general approach to writing optimizer rules and | ||
then move onto studying the existing rules. | ||
|
||
All rules must implement the `OptimizerRule` trait. | ||
|
||
```rust | ||
/// `OptimizerRule` transforms one ['LogicalPlan'] into another which | ||
/// computes the same results, but in a potentially more efficient | ||
/// way. If there are no suitable transformations for the input plan, | ||
/// the optimizer can simply return it as is. | ||
pub trait OptimizerRule { | ||
/// Rewrite `plan` to an optimized form | ||
fn optimize( | ||
&self, | ||
plan: &LogicalPlan, | ||
config: &dyn OptimizerConfig, | ||
) -> Result<LogicalPlan>; | ||
|
||
/// A human readable name for this optimizer rule | ||
fn name(&self) -> &str; | ||
} | ||
``` | ||
|
||
### General Guidelines | ||
|
||
Rules typical walk the logical plan and walk the expression trees inside operators and selectively mutate | ||
individual operators or expressions. | ||
|
||
Sometimes there is an initial pass that visits the plan and builds state that is used in a second pass that performs | ||
the actual optimization. This approach is used in projection push down and filter push down. | ||
|
||
### Expression Naming | ||
|
||
Every expression in DataFusion has a name, which is used as the column name. For example, in this example the output | ||
contains a single column with the name `"COUNT(aggregate_test_100.c9)"`: | ||
|
||
```text | ||
> select count(c9) from aggregate_test_100; | ||
+------------------------------+ | ||
| COUNT(aggregate_test_100.c9) | | ||
+------------------------------+ | ||
| 100 | | ||
+------------------------------+ | ||
``` | ||
|
||
These names are used to refer to the columns in both subqueries as well as internally from one stage of the LogicalPlan | ||
to another. For example: | ||
|
||
```text | ||
> select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq; | ||
+--------------------------------------------+ | ||
| sq.COUNT(aggregate_test_100.c9) + Int64(1) | | ||
+--------------------------------------------+ | ||
| 101 | | ||
+--------------------------------------------+ | ||
``` | ||
|
||
### Implication | ||
|
||
Because DataFusion identifies columns using a string name, it means it is critical that the names of expressions are | ||
not changed by the optimizer when it rewrites expressions. This is typically accomplished by renaming a rewritten | ||
expression by adding an alias. | ||
|
||
Here is a simple example of such a rewrite. The expression `1 + 2` can be internally simplified to 3 but must still be | ||
displayed the same as `1 + 2`: | ||
|
||
```text | ||
> select 1 + 2; | ||
+---------------------+ | ||
| Int64(1) + Int64(2) | | ||
+---------------------+ | ||
| 3 | | ||
+---------------------+ | ||
``` | ||
|
||
Looking at the `EXPLAIN` output we can see that the optimizer has effectively rewritten `1 + 2` into effectively | ||
`3 as "1 + 2"`: | ||
|
||
```text | ||
> explain select 1 + 2; | ||
+---------------+-------------------------------------------------+ | ||
| plan_type | plan | | ||
+---------------+-------------------------------------------------+ | ||
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) | | ||
| | EmptyRelation | | ||
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] | | ||
| | PlaceholderRowExec | | ||
| | | | ||
+---------------+-------------------------------------------------+ | ||
``` | ||
|
||
If the expression name is not preserved, bugs such as [#3704](https://github.com/apache/datafusion/issues/3704) | ||
and [#3555](https://github.com/apache/datafusion/issues/3555) occur where the expected columns can not be found. | ||
|
||
### Building Expression Names | ||
|
||
There are currently two ways to create a name for an expression in the logical plan. | ||
|
||
```rust | ||
impl Expr { | ||
/// Returns the name of this expression as it should appear in a schema. This name | ||
/// will not include any CAST expressions. | ||
pub fn display_name(&self) -> Result<String> { | ||
create_name(self) | ||
} | ||
|
||
/// Returns a full and complete string representation of this expression. | ||
pub fn canonical_name(&self) -> String { | ||
format!("{}", self) | ||
} | ||
} | ||
``` | ||
|
||
When comparing expressions to determine if they are equivalent, `canonical_name` should be used, and when creating a | ||
name to be used in a schema, `display_name` should be used. | ||
|
||
### Utilities | ||
|
||
There are a number of utility methods provided that take care of some common tasks. | ||
|
||
### ExprVisitor | ||
|
||
The `ExprVisitor` and `ExprVisitable` traits provide a mechanism for applying a visitor pattern to an expression tree. | ||
|
||
Here is an example that demonstrates this. | ||
|
||
```rust | ||
fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec<Expr>) -> Result<()> { | ||
struct InSubqueryVisitor<'a> { | ||
accum: &'a mut Vec<Expr>, | ||
} | ||
|
||
impl ExpressionVisitor for InSubqueryVisitor<'_> { | ||
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> { | ||
if let Expr::InSubquery(_) = expr { | ||
self.accum.push(expr.to_owned()); | ||
} | ||
Ok(Recursion::Continue(self)) | ||
} | ||
} | ||
|
||
expression.accept(InSubqueryVisitor { accum: extracted })?; | ||
Ok(()) | ||
} | ||
``` | ||
|
||
### Rewriting Expressions | ||
|
||
The `MyExprRewriter` trait can be implemented to provide a way to rewrite expressions. This rule can then be applied | ||
to an expression by calling `Expr::rewrite` (from the `ExprRewritable` trait). | ||
|
||
The `rewrite` method will perform a depth first walk of the expression and its children to rewrite an expression, | ||
consuming `self` producing a new expression. | ||
|
||
```rust | ||
let mut expr_rewriter = MyExprRewriter {}; | ||
let expr = expr.rewrite(&mut expr_rewriter)?; | ||
``` | ||
|
||
Here is an example implementation which will rewrite `expr BETWEEN a AND b` as `expr >= a AND expr <= b`. Note that the | ||
implementation does not need to perform any recursion since this is handled by the `rewrite` method. | ||
|
||
```rust | ||
struct MyExprRewriter {} | ||
|
||
impl ExprRewriter for MyExprRewriter { | ||
fn mutate(&mut self, expr: Expr) -> Result<Expr> { | ||
match expr { | ||
Expr::Between { | ||
negated, | ||
expr, | ||
low, | ||
high, | ||
} => { | ||
let expr: Expr = expr.as_ref().clone(); | ||
let low: Expr = low.as_ref().clone(); | ||
let high: Expr = high.as_ref().clone(); | ||
if negated { | ||
Ok(expr.clone().lt(low).or(expr.clone().gt(high))) | ||
} else { | ||
Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) | ||
} | ||
} | ||
_ => Ok(expr.clone()), | ||
} | ||
} | ||
} | ||
``` | ||
|
||
### optimize_children | ||
|
||
Typically a rule is applied recursively to all operators within a query plan. Rather than duplicate | ||
that logic in each rule, an `optimize_children` method is provided. This recursively invokes the `optimize` method on | ||
the plan's children and then returns a node of the same type. | ||
|
||
```rust | ||
fn optimize( | ||
&self, | ||
plan: &LogicalPlan, | ||
_config: &mut OptimizerConfig, | ||
) -> Result<LogicalPlan> { | ||
// recurse down and optimize children first | ||
let plan = utils::optimize_children(self, plan, _config)?; | ||
|
||
... | ||
} | ||
``` | ||
|
||
### Writing Tests | ||
|
||
There should be unit tests in the same file as the new rule that test the effect of the rule being applied to a plan | ||
in isolation (without any other rule being applied). | ||
|
||
There should also be a test in `integration-tests.rs` that tests the rule as part of the overall optimization process. | ||
|
||
### Debugging | ||
|
||
The `EXPLAIN VERBOSE` command can be used to show the effect of each optimization rule on a query. | ||
|
||
In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time. | ||
|
||
```text | ||
> explain verbose select cast(1 + 2.2 as string) as foo; | ||
+------------------------------------------------------------+---------------------------------------------------------------------------+ | ||
| plan_type | plan | | ||
+------------------------------------------------------------+---------------------------------------------------------------------------+ | ||
| initial_logical_plan | Projection: CAST(Int64(1) + Float64(2.2) AS Utf8) AS foo | | ||
| | EmptyRelation | | ||
| logical_plan after type_coercion | Projection: CAST(CAST(Int64(1) AS Float64) + Float64(2.2) AS Utf8) AS foo | | ||
| | EmptyRelation | | ||
| logical_plan after simplify_expressions | Projection: Utf8("3.2") AS foo | | ||
| | EmptyRelation | | ||
| logical_plan after unwrap_cast_in_comparison | SAME TEXT AS ABOVE | | ||
| logical_plan after decorrelate_where_exists | SAME TEXT AS ABOVE | | ||
| logical_plan after decorrelate_where_in | SAME TEXT AS ABOVE | | ||
| logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | | ||
| logical_plan after subquery_filter_to_join | SAME TEXT AS ABOVE | | ||
| logical_plan after simplify_expressions | SAME TEXT AS ABOVE | | ||
| logical_plan after eliminate_filter | SAME TEXT AS ABOVE | | ||
| logical_plan after reduce_cross_join | SAME TEXT AS ABOVE | | ||
| logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | | ||
| logical_plan after eliminate_limit | SAME TEXT AS ABOVE | | ||
| logical_plan after projection_push_down | SAME TEXT AS ABOVE | | ||
| logical_plan after rewrite_disjunctive_predicate | SAME TEXT AS ABOVE | | ||
| logical_plan after reduce_outer_join | SAME TEXT AS ABOVE | | ||
| logical_plan after filter_push_down | SAME TEXT AS ABOVE | | ||
| logical_plan after limit_push_down | SAME TEXT AS ABOVE | | ||
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | | ||
| logical_plan | Projection: Utf8("3.2") AS foo | | ||
| | EmptyRelation | | ||
| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] | | ||
| | PlaceholderRowExec | | ||
| | | | ||
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE | | ||
| physical_plan after join_selection | SAME TEXT AS ABOVE | | ||
| physical_plan after coalesce_batches | SAME TEXT AS ABOVE | | ||
| physical_plan after repartition | SAME TEXT AS ABOVE | | ||
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE | | ||
| physical_plan | ProjectionExec: expr=[3.2 as foo] | | ||
| | PlaceholderRowExec | | ||
| | | | ||
+------------------------------------------------------------+---------------------------------------------------------------------------+ | ||
``` | ||
|
||
[df]: https://crates.io/crates/datafusion |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps can we name this file
query-optimizer.md
rather thandatafusion-query-optimizer.md
? I think the datafusion is redundant