From c944b812c24b5032499296ee4244efc1cf2cf4f2 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Mon, 4 Mar 2024 22:05:28 +0300 Subject: [PATCH] Simplifications and comment improvements (#2) --- datafusion/common/src/tree_node.rs | 682 ++++++++---------- .../core/src/datasource/listing/helpers.rs | 24 +- .../physical_plan/parquet/row_filter.rs | 19 +- .../physical_optimizer/coalesce_batches.rs | 6 +- .../enforce_distribution.rs | 23 +- .../src/physical_optimizer/enforce_sorting.rs | 20 +- .../src/physical_optimizer/join_selection.rs | 8 +- .../limited_distinct_aggregation.rs | 11 +- .../physical_optimizer/projection_pushdown.rs | 6 +- .../core/src/physical_optimizer/pruning.rs | 9 +- .../physical_optimizer/topk_aggregation.rs | 11 +- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 15 +- datafusion/expr/src/expr.rs | 27 +- datafusion/expr/src/expr_rewriter/mod.rs | 15 +- datafusion/expr/src/expr_rewriter/order_by.rs | 1 + datafusion/expr/src/logical_plan/display.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 4 +- datafusion/expr/src/tree_node/expr.rs | 4 +- datafusion/expr/src/tree_node/plan.rs | 15 +- .../src/analyzer/count_wildcard_rule.rs | 4 +- .../src/analyzer/inline_table_scan.rs | 37 +- .../optimizer/src/analyzer/rewrite_expr.rs | 69 +- datafusion/optimizer/src/analyzer/subquery.rs | 11 +- .../optimizer/src/analyzer/type_coercion.rs | 44 +- .../optimizer/src/common_subexpr_eliminate.rs | 14 +- datafusion/optimizer/src/decorrelate.rs | 25 +- .../src/decorrelate_predicate_subquery.rs | 17 +- datafusion/optimizer/src/plan_signature.rs | 2 +- .../optimizer/src/scalar_subquery_to_join.rs | 33 +- .../simplify_expressions/expr_simplifier.rs | 62 +- .../src/simplify_expressions/guarantees.rs | 8 +- .../simplify_expressions/inlist_simplifier.rs | 6 +- .../src/unwrap_cast_in_comparison.rs | 13 +- datafusion/optimizer/src/utils.rs | 8 +- .../physical-expr/src/equivalence/class.rs | 19 +- .../physical-expr/src/equivalence/mod.rs | 17 +- .../src/equivalence/projection.rs | 5 +- .../src/equivalence/properties.rs | 15 +- .../physical-expr/src/expressions/case.rs | 16 +- datafusion/physical-expr/src/tree_node.rs | 3 +- datafusion/physical-expr/src/utils/mod.rs | 8 +- .../src/joins/stream_join_utils.rs | 6 +- datafusion/physical-plan/src/joins/utils.rs | 6 +- datafusion/physical-plan/src/lib.rs | 2 +- datafusion/physical-plan/src/tree_node.rs | 3 +- datafusion/sql/src/utils.rs | 14 +- .../sqllogictest/test_files/group_by.slt | 2 +- 47 files changed, 663 insertions(+), 710 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 3de0c5bf945c..2d653a27c47b 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -22,88 +22,52 @@ use std::sync::Arc; use crate::Result; -/// This macro is used to determine continuation after a top-down closure is applied -/// during visiting traversals. +/// This macro is used to control continuation behaviors during tree traversals +/// based on the specified direction. Depending on `$DIRECTION` and the value of +/// the given expression (`$EXPR`), which should be a variant of [`TreeNodeRecursion`], +/// the macro results in the following behavior: /// -/// If the function returns [`TreeNodeRecursion::Continue`], the normal execution of the -/// function continues. -/// If it returns [`TreeNodeRecursion::Jump`], the function returns with (propagates up) -/// [`TreeNodeRecursion::Continue`] to jump next recursion step, bypassing further -/// exploration of the current step. -/// In case of [`TreeNodeRecursion::Stop`], the function return with (propagates up) -/// [`TreeNodeRecursion::Stop`] and recursion halts. +/// - If the expression returns [`TreeNodeRecursion::Continue`], normal execution +/// continues. +/// - If it returns [`TreeNodeRecursion::Stop`], recursion halts and propagates +/// [`TreeNodeRecursion::Stop`]. +/// - If it returns [`TreeNodeRecursion::Jump`], the continuation behavior depends +/// on the traversal direction: +/// - For `UP` direction, the function returns with [`TreeNodeRecursion::Jump`], +/// bypassing further bottom-up closures until the next top-down closure. +/// - For `DOWN` direction, the function returns with [`TreeNodeRecursion::Continue`], +/// skipping further exploration. +/// - If no direction is specified, `Jump` is treated like `Continue`. #[macro_export] -macro_rules! handle_visit_recursion_down { - ($EXPR:expr) => { - match $EXPR { - TreeNodeRecursion::Continue => {} - TreeNodeRecursion::Jump => return Ok(TreeNodeRecursion::Continue), - TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), - } - }; -} - -/// This macro is used to determine continuation between visiting siblings during visiting -/// traversals. -/// -/// If the function returns [`TreeNodeRecursion::Continue`] or -/// [`TreeNodeRecursion::Jump`], the normal execution of the function continues. -/// In case of [`TreeNodeRecursion::Stop`], the function return with (propagates up) -/// [`TreeNodeRecursion::Stop`] and recursion halts. macro_rules! handle_visit_recursion { - ($EXPR:expr) => { - match $EXPR { - TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {} - TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), - } + // Internal helper macro for handling the `Jump` case based on the direction: + (@handle_jump UP) => { + return Ok(TreeNodeRecursion::Jump) + }; + (@handle_jump DOWN) => { + return Ok(TreeNodeRecursion::Continue) + }; + (@handle_jump) => { + {} // Treat `Jump` like `Continue`, do nothing and continue execution. }; -} -/// This macro is used to determine continuation before a bottom-up closure is applied -/// during visiting traversals. -/// -/// If the function returns [`TreeNodeRecursion::Continue`], the normal execution of the -/// function continues. -/// If it returns [`TreeNodeRecursion::Jump`], the function returns with (propagates up) -/// [`TreeNodeRecursion::Jump`], bypassing further bottom-up closures until a top-down -/// closure is found. -/// In case of [`TreeNodeRecursion::Stop`], the function return with (propagates up) -/// [`TreeNodeRecursion::Stop`] and recursion halts. -#[macro_export] -macro_rules! handle_visit_recursion_up { - ($EXPR:expr) => { + // Main macro logic with variables to handle directionality. + ($EXPR:expr $(, $DIRECTION:ident)?) => { match $EXPR { TreeNodeRecursion::Continue => {} - TreeNodeRecursion::Jump => return Ok(TreeNodeRecursion::Jump), + TreeNodeRecursion::Jump => handle_visit_recursion!(@handle_jump $($DIRECTION)?), TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop), } }; } -/// This macro is used to determine continuation during top-down transforming traversals. -/// -/// After the bottom-up closure returns with [`Transformed`] depending on the returned -/// [`TreeNodeRecursion`], [`Transformed::try_transform_node_with()`] decides about recursion -/// continuation and [`TreeNodeRecursion`] state propagation. -#[macro_export] -macro_rules! handle_transform_recursion_down { - ($F_DOWN:expr, $F_SELF:expr) => { - $F_DOWN?.try_transform_node_with( - |n| n.map_children($F_SELF), - TreeNodeRecursion::Continue, - ) - }; -} - -/// This macro is used to determine continuation during combined transforming traversals. +/// This macro is used to determine continuation during combined transforming +/// traversals. /// -/// After the bottom-up closure returns with [`Transformed`] depending on the returned -/// [`TreeNodeRecursion`], [`Transformed::try_transform_node_with()`] decides about recursion -/// continuation and if [`TreeNodeRecursion`] state propagation is needed. -/// And then after recursing into children returns with [`Transformed`] depending on the -/// returned [`TreeNodeRecursion`], [`Transformed::try_transform_node_with()`] decides about recursion -/// continuation and [`TreeNodeRecursion`] state propagation. -#[macro_export] +/// Depending on the [`TreeNodeRecursion`] the bottom-up closure returns, +/// [`Transformed::try_transform_node_with()`] decides recursion continuation +/// and if state propagation is necessary. Then, the same procedure recursively +/// applies to the children of the node in question. macro_rules! handle_transform_recursion { ($F_DOWN:expr, $F_SELF:expr, $F_UP:expr) => {{ let pre_visited = $F_DOWN?; @@ -112,35 +76,20 @@ macro_rules! handle_transform_recursion { .data .map_children($F_SELF)? .try_transform_node_with($F_UP, TreeNodeRecursion::Jump), - TreeNodeRecursion::Jump => - { - #[allow(clippy::redundant_closure_call)] - $F_UP(pre_visited.data) - } + #[allow(clippy::redundant_closure_call)] + TreeNodeRecursion::Jump => $F_UP(pre_visited.data), TreeNodeRecursion::Stop => return Ok(pre_visited), } - .map(|post_visited| post_visited.update_transformed(pre_visited.transformed)) + .map(|mut post_visited| { + post_visited.transformed |= pre_visited.transformed; + post_visited + }) }}; } -/// This macro is used to determine continuation during bottom-up transforming traversals. -/// -/// After recursing into children returns with [`Transformed`] depending on the returned -/// [`TreeNodeRecursion`], [`Transformed::try_transform_node_with()`] decides about recursion -/// continuation and [`TreeNodeRecursion`] state propagation. -#[macro_export] -macro_rules! handle_transform_recursion_up { - ($NODE:expr, $F_SELF:expr, $F_UP:expr) => { - $NODE - .map_children($F_SELF)? - .try_transform_node_with($F_UP, TreeNodeRecursion::Jump) - }; -} - -/// Defines a visitable and rewriteable a tree node. This trait is -/// implemented for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as -/// well as expression trees ([`PhysicalExpr`], [`Expr`]) in -/// DataFusion +/// Defines a visitable and rewriteable tree node. This trait is implemented +/// for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as well as expression +/// trees ([`PhysicalExpr`], [`Expr`]) in DataFusion. /// /// /// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html @@ -148,17 +97,17 @@ macro_rules! handle_transform_recursion_up { /// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html /// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html pub trait TreeNode: Sized { - /// Visit the tree node using the given [TreeNodeVisitor] - /// It performs a depth first walk of an node and its children. + /// Visit the tree node using the given [`TreeNodeVisitor`], performing a + /// depth-first walk of the node and its children. /// - /// For an node tree such as + /// Consider the following tree structure: /// ```text /// ParentNode /// left: ChildNode1 /// right: ChildNode2 /// ``` /// - /// The nodes are visited using the following order + /// Here, the nodes would be visited using the following order: /// ```text /// TreeNodeVisitor::f_down(ParentNode) /// TreeNodeVisitor::f_down(ChildNode1) @@ -168,21 +117,22 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ParentNode) /// ``` /// - /// See [`TreeNodeRecursion`] for more details on how the traversal can be controlled. + /// See [`TreeNodeRecursion`] for more details on controlling the traversal. /// /// If [`TreeNodeVisitor::f_down()`] or [`TreeNodeVisitor::f_up()`] returns [`Err`], - /// recursion is stopped immediately. + /// the recursion stops immediately. /// - /// If using the default [`TreeNodeVisitor::f_up`] that does - /// nothing, [`Self::apply`] should be preferred. + /// If using the default [`TreeNodeVisitor::f_up`] that does nothing, consider using + /// [`Self::apply`]. fn visit>( &self, visitor: &mut V, ) -> Result { match visitor.f_down(self)? { TreeNodeRecursion::Continue => { - handle_visit_recursion_up!( - self.apply_children(&mut |n| n.visit(visitor))? + handle_visit_recursion!( + self.apply_children(&mut |n| n.visit(visitor))?, + UP ); visitor.f_up(self) } @@ -194,14 +144,14 @@ pub trait TreeNode: Sized { /// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for /// recursively transforming [`TreeNode`]s. /// - /// E.g. for an tree such as: + /// Consider the following tree structure: /// ```text /// ParentNode /// left: ChildNode1 /// right: ChildNode2 /// ``` /// - /// The nodes are visited using the following order: + /// Here, the nodes would be visited using the following order: /// ```text /// TreeNodeRewriter::f_down(ParentNode) /// TreeNodeRewriter::f_down(ChildNode1) @@ -211,10 +161,10 @@ pub trait TreeNode: Sized { /// TreeNodeRewriter::f_up(ParentNode) /// ``` /// - /// See [`TreeNodeRecursion`] for more details on how the traversal can be controlled. + /// See [`TreeNodeRecursion`] for more details on controlling the traversal. /// - /// If [`TreeNodeRewriter::f_down()`] or [`TreeNodeRewriter::f_up()`] returns [`Err`], - /// recursion is stopped immediately. + /// If [`TreeNodeVisitor::f_down()`] or [`TreeNodeVisitor::f_up()`] returns [`Err`], + /// the recursion stops immediately. fn rewrite>( self, rewriter: &mut R, @@ -224,79 +174,90 @@ pub trait TreeNode: Sized { }) } - /// Applies `f` to the node and its children. `f` is applied in a preoder way, - /// and it is controlled by [`TreeNodeRecursion`], which means result of the `f` - /// on the self node can cause an early return. + /// Applies `f` to the node and its children. `f` is applied in a pre-order + /// way, and it is controlled by [`TreeNodeRecursion`], which means result + /// of the `f` on a node can cause an early return. /// - /// The `f` closure can be used to collect some info from the - /// tree node or do some checking for the tree node. + /// The `f` closure can be used to collect some information from tree nodes + /// or run a check on the tree. fn apply Result>( &self, f: &mut F, ) -> Result { - handle_visit_recursion_down!(f(self)?); + handle_visit_recursion!(f(self)?, DOWN); self.apply_children(&mut |n| n.apply(f)) } - /// Convenience utils for writing optimizers rule: recursively apply the given `f` to the node tree. - /// When `f` does not apply to a given node, it is left unchanged. - /// The default tree traversal direction is transform_up(Postorder Traversal). - fn transform(self, f: &F) -> Result> - where - F: Fn(Self) -> Result>, - { + /// Convenience utility for writing optimizer rules: Recursively apply the + /// given function `f` to the tree in a bottom-up (post-order) fashion. When + /// `f` does not apply to a given node, it is left unchanged. + fn transform Result>>( + self, + f: &F, + ) -> Result> { self.transform_up(f) } - /// Convenience utils for writing optimizers rule: recursively apply the given 'f' to the node and all of its - /// children(Preorder Traversal). - /// When the `f` does not apply to a given node, it is left unchanged. - fn transform_down(self, f: &F) -> Result> - where - F: Fn(Self) -> Result>, - { - handle_transform_recursion_down!(f(self), |c| c.transform_down(f)) + /// Convenience utility for writing optimizer rules: Recursively apply the + /// given function `f` to a node and then to its children (pre-order traversal). + /// When `f` does not apply to a given node, it is left unchanged. + fn transform_down Result>>( + self, + f: &F, + ) -> Result> { + f(self)?.try_transform_node_with( + |n| n.map_children(|c| c.transform_down(f)), + TreeNodeRecursion::Continue, + ) } - /// Convenience utils for writing optimizers rule: recursively apply the given 'f' to the node and all of its - /// children(Preorder Traversal) using a mutable function, `F`. - /// When the `f` does not apply to a given node, it is left unchanged. - fn transform_down_mut(self, f: &mut F) -> Result> - where - F: FnMut(Self) -> Result>, - { - handle_transform_recursion_down!(f(self), |c| c.transform_down_mut(f)) + /// Convenience utility for writing optimizer rules: Recursively apply the + /// given mutable function `f` to a node and then to its children (pre-order + /// traversal). When `f` does not apply to a given node, it is left unchanged. + fn transform_down_mut Result>>( + self, + f: &mut F, + ) -> Result> { + f(self)?.try_transform_node_with( + |n| n.map_children(|c| c.transform_down_mut(f)), + TreeNodeRecursion::Continue, + ) } - /// Convenience utils for writing optimizers rule: recursively apply the given 'f' first to all of its - /// children and then itself(Postorder Traversal). - /// When the `f` does not apply to a given node, it is left unchanged. - fn transform_up(self, f: &F) -> Result> - where - F: Fn(Self) -> Result>, - { - handle_transform_recursion_up!(self, |c| c.transform_up(f), f) + /// Convenience utility for writing optimizer rules: Recursively apply the + /// given function `f` to all children of a node, and then to the node itself + /// (post-order traversal). When `f` does not apply to a given node, it is + /// left unchanged. + fn transform_up Result>>( + self, + f: &F, + ) -> Result> { + self.map_children(|c| c.transform_up(f))? + .try_transform_node_with(f, TreeNodeRecursion::Jump) } - /// Convenience utils for writing optimizers rule: recursively apply the given 'f' first to all of its - /// children and then itself(Postorder Traversal) using a mutable function, `F`. - /// When the `f` does not apply to a given node, it is left unchanged. - fn transform_up_mut(self, f: &mut F) -> Result> - where - F: FnMut(Self) -> Result>, - { - handle_transform_recursion_up!(self, |c| c.transform_up_mut(f), f) + /// Convenience utility for writing optimizer rules: Recursively apply the + /// given mutable function `f` to all children of a node, and then to the + /// node itself (post-order traversal). When `f` does not apply to a given + /// node, it is left unchanged. + fn transform_up_mut Result>>( + self, + f: &mut F, + ) -> Result> { + self.map_children(|c| c.transform_up_mut(f))? + .try_transform_node_with(f, TreeNodeRecursion::Jump) } /// Transforms the tree using `f_down` while traversing the tree top-down - /// (pre-preorder) and using `f_up` while traversing the tree bottom-up (post-order). + /// (pre-order), and using `f_up` while traversing the tree bottom-up + /// (post-order). /// /// Use this method if you want to start the `f_up` process right where `f_down` jumps. /// This can make the whole process faster by reducing the number of `f_up` steps. /// If you don't need this, it's just like using `transform_down_mut` followed by /// `transform_up_mut` on the same tree. /// - /// E.g. for an tree such as: + /// Consider the following tree structure: /// ```text /// ParentNode /// left: ChildNode1 @@ -313,78 +274,77 @@ pub trait TreeNode: Sized { /// f_up(ParentNode) /// ``` /// - /// See [`TreeNodeRecursion`] for more details on how the traversal can be controlled. + /// See [`TreeNodeRecursion`] for more details on controlling the traversal. /// - /// If `f_down` or `f_up` returns [`Err`], recursion is stopped immediately. + /// If `f_down` or `f_up` returns [`Err`], the recursion stops immediately. /// /// Example: /// ```text - /// | +---+ - /// | | J | - /// | +---+ - /// | | - /// | +---+ - /// TreeNodeRecursion::Continue | | I | - /// | +---+ - /// | | - /// | +---+ - /// \|/ | F | - /// ' +---+ - /// / \ ___________________ - /// When `f_down` is +---+ \ ---+ - /// applied on node "E", | E | | G | - /// it returns with "jump". +---+ +---+ - /// | | - /// +---+ +---+ - /// | C | | H | - /// +---+ +---+ - /// / \ - /// +---+ +---+ - /// | B | | D | - /// +---+ +---+ - /// | - /// +---+ - /// | A | - /// +---+ + /// | +---+ + /// | | J | + /// | +---+ + /// | | + /// | +---+ + /// TreeNodeRecursion::Continue | | I | + /// | +---+ + /// | | + /// | +---+ + /// \|/ | F | + /// ' +---+ + /// / \ ___________________ + /// When `f_down` is +---+ \ ---+ + /// applied on node "E", | E | | G | + /// it returns with "Jump". +---+ +---+ + /// | | + /// +---+ +---+ + /// | C | | H | + /// +---+ +---+ + /// / \ + /// +---+ +---+ + /// | B | | D | + /// +---+ +---+ + /// | + /// +---+ + /// | A | + /// +---+ /// - /// Instead of starting from leaf nodes, `f_up` starts from the node "E". - /// +---+ - /// | | J | - /// | +---+ - /// | | - /// | +---+ - /// | | I | - /// | +---+ - /// | | - /// / +---+ - /// / | F | - /// / +---+ - /// / / \ ______________________ - /// | +---+ . \ ---+ - /// | | E | /|\ After `f_down` jumps | G | - /// | +---+ | on node E, `f_up` +---+ - /// \------| ---/ if applied on node E. | - /// +---+ +---+ - /// | C | | H | - /// +---+ +---+ - /// / \ - /// +---+ +---+ - /// | B | | D | - /// +---+ +---+ - /// | - /// +---+ - /// | A | - /// +---+ - /// ``` - fn transform_down_up( + /// Instead of starting from leaf nodes, `f_up` starts from the node "E". + /// +---+ + /// | | J | + /// | +---+ + /// | | + /// | +---+ + /// | | I | + /// | +---+ + /// | | + /// / +---+ + /// / | F | + /// / +---+ + /// / / \ ______________________ + /// | +---+ . \ ---+ + /// | | E | /|\ After `f_down` jumps | G | + /// | +---+ | on node E, `f_up` +---+ + /// \------| ---/ if applied on node E. | + /// +---+ +---+ + /// | C | | H | + /// +---+ +---+ + /// / \ + /// +---+ +---+ + /// | B | | D | + /// +---+ +---+ + /// | + /// +---+ + /// | A | + /// +---+ + /// ``` + fn transform_down_up< + FD: FnMut(Self) -> Result>, + FU: FnMut(Self) -> Result>, + >( self, f_down: &mut FD, f_up: &mut FU, - ) -> Result> - where - FD: FnMut(Self) -> Result>, - FU: FnMut(Self) -> Result>, - { + ) -> Result> { handle_transform_recursion!( f_down(self), |c| c.transform_down_up(f_down, f_up), @@ -392,72 +352,60 @@ pub trait TreeNode: Sized { ) } - /// Apply the closure `F` to the node's children - fn apply_children(&self, f: &mut F) -> Result - where - F: FnMut(&Self) -> Result; + /// Apply the closure `F` to the node's children. + fn apply_children Result>( + &self, + f: &mut F, + ) -> Result; - /// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder) - fn map_children(self, f: F) -> Result> - where - F: FnMut(Self) -> Result>; + /// Apply transform `F` to the node's children. Note that the transform `F` + /// might have a direction (pre-order or post-order). + fn map_children Result>>( + self, + f: F, + ) -> Result>; } -/// Implements the [visitor -/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively walking [`TreeNode`]s. -/// -/// [`TreeNodeVisitor`] allows keeping the algorithms -/// separate from the code to traverse the structure of the `TreeNode` -/// tree and makes it easier to add new types of tree node and -/// algorithms. -/// -/// When passed to[`TreeNode::visit`], [`TreeNodeVisitor::f_down`] -/// and [`TreeNodeVisitor::f_up`] are invoked recursively -/// on an node tree. +/// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) +/// for recursively walking [`TreeNode`]s. /// -/// If an [`Err`] result is returned, recursion is stopped -/// immediately. +/// A [`TreeNodeVisitor`] allows one to express algorithms separately from the +/// code traversing the structure of the `TreeNode` tree, making it easier to +/// add new types of tree nodes and algorithms. /// -/// If [`TreeNodeRecursion::Stop`] is returned on a call to pre_visit, no -/// children of that tree node are visited, nor is post_visit -/// called on that tree node -/// -/// If [`TreeNodeRecursion::Stop`] is returned on a call to post_visit, no -/// siblings of that tree node are visited, nor is post_visit -/// called on its parent tree node -/// -/// If [`TreeNodeRecursion::Jump`] is returned on a call to pre_visit, no -/// children of that tree node are visited. +/// When passed to [`TreeNode::visit`], [`TreeNodeVisitor::f_down`] and +/// [`TreeNodeVisitor::f_up`] are invoked recursively on the tree. +/// See [`TreeNodeRecursion`] for more details on controlling the traversal. pub trait TreeNodeVisitor: Sized { /// The node type which is visitable. type Node: TreeNode; /// Invoked before any children of `node` are visited. - /// Default implementation returns the node unmodified and continues recursion. + /// Default implementation simply continues the recursion. fn f_down(&mut self, _node: &Self::Node) -> Result { Ok(TreeNodeRecursion::Continue) } - /// Invoked after all children of `node` are visited. Default - /// implementation does nothing. + /// Invoked after all children of `node` are visited. + /// Default implementation simply continues the recursion. fn f_up(&mut self, _node: &Self::Node) -> Result { Ok(TreeNodeRecursion::Continue) } } -/// Trait for potentially recursively transform a [`TreeNode`] node tree. +/// Trait for potentially recursively transforming a tree of [`TreeNode`]s. pub trait TreeNodeRewriter: Sized { /// The node type which is rewritable. type Node: TreeNode; /// Invoked while traversing down the tree before any children are rewritten. - /// Default implementation returns the node unmodified and continues recursion. + /// Default implementation returns the node as is and continues recursion. fn f_down(&mut self, node: Self::Node) -> Result> { Ok(Transformed::no(node)) } /// Invoked while traversing up the tree after all children have been rewritten. - /// Default implementation returns the node unmodified. + /// Default implementation returns the node as is and continues recursion. fn f_up(&mut self, node: Self::Node) -> Result> { Ok(Transformed::no(node)) } @@ -468,37 +416,42 @@ pub trait TreeNodeRewriter: Sized { pub enum TreeNodeRecursion { /// Continue recursion with the next node. Continue, - - /// In top-down traversals, skip recursing into children but continue with the next - /// node, which actually means pruning of the subtree. + /// In top-down traversals, skip recursing into children but continue with + /// the next node, which actually means pruning of the subtree. /// - /// In bottom-up traversals, bypass calling bottom-up closures till the next leaf node. + /// In bottom-up traversals, bypass calling bottom-up closures till the next + /// leaf node. /// - /// In combined traversals, if it is "f_down" (pre-order) phase, execution "jumps" to - /// next "f_up" (post_order) phase by shortcutting its children. If it is "f_up" (pre-order) - /// phase, execution "jumps" to next "f_down" (pre_order) phase by shortcutting its parent - /// nodes until the first parent node having unvisited children path. + /// In combined traversals, if it is the `f_down` (pre-order) phase, execution + /// "jumps" to the next `f_up` (post-order) phase by shortcutting its children. + /// If it is the `f_up` (post-order) phase, execution "jumps" to the next `f_down` + /// (pre-order) phase by shortcutting its parent nodes until the first parent node + /// having unvisited children path. Jump, - /// Stop recursion. Stop, } -/// This struct is used with [`TreeNode::rewrite`], [`TreeNode::transform_down`], -/// [`TreeNode::transform_down_mut`], [`TreeNode::transform_up`], -/// [`TreeNode::transform_up_mut`] and [`TreeNode::transform_down_up`] methods to control -/// transformations and return the transformed result. +/// This struct is used by tree transformation APIs such as +/// - [`TreeNode::rewrite`], +/// - [`TreeNode::transform_down`], +/// - [`TreeNode::transform_down_mut`], +/// - [`TreeNode::transform_up`], +/// - [`TreeNode::transform_up_mut`], +/// - [`TreeNode::transform_down_up`] /// -/// API users can provide transformation closures and [`TreeNodeRewriter`] -/// implementations to control transformation by specifying: -/// - the possibly transformed node, -/// - if any change was made to the node and -/// - how to proceed with the recursion. +/// to control the transformation and return the transformed result. /// -/// The APIs return this struct with the: -/// - final possibly transformed tree, -/// - if any change was made to any node and -/// - how the recursion ended. +/// Specifically, API users can provide transformation closures or [`TreeNodeRewriter`] +/// implementations to control the transformation by returning: +/// - The resulting (possibly transformed) node, +/// - A flag indicating whether any change was made to the node, and +/// - A flag specifying how to proceed with the recursion. +/// +/// At the end of the transformation, the return value will contain: +/// - The final (possibly transformed) tree, +/// - A flag indicating whether any change was made to the tree, and +/// - A flag specifying how the recursion ended. #[derive(PartialEq, Debug)] pub struct Transformed { pub data: T, @@ -507,6 +460,7 @@ pub struct Transformed { } impl Transformed { + /// Create a new `Transformed` object with the given information. pub fn new(data: T, transformed: bool, tnr: TreeNodeRecursion) -> Self { Self { data, @@ -517,115 +471,99 @@ impl Transformed { /// Wrapper for transformed data with [`TreeNodeRecursion::Continue`] statement. pub fn yes(data: T) -> Self { - Self { - data, - transformed: true, - tnr: TreeNodeRecursion::Continue, - } + Self::new(data, true, TreeNodeRecursion::Continue) } - /// Wrapper for non-transformed data with [`TreeNodeRecursion::Continue`] statement. + /// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] statement. pub fn no(data: T) -> Self { - Self { - data, - transformed: false, - tnr: TreeNodeRecursion::Continue, - } + Self::new(data, false, TreeNodeRecursion::Continue) } - /// Applies the given `f` to the data of [`Transformed`] object. + /// Applies the given `f` to the data of this [`Transformed`] object. pub fn update_data U>(self, f: F) -> Transformed { Transformed::new(f(self.data), self.transformed, self.tnr) } - /// Updates the transformed state based on the current and the new state. - pub fn update_transformed(self, transformed: bool) -> Self { - Self { - transformed: self.transformed || transformed, - ..self - } - } - - /// Sets a new [`TreeNodeRecursion`]. - pub fn update_tnr(self, tnr: TreeNodeRecursion) -> Self { - Self { tnr, ..self } - } - /// Maps the data of [`Transformed`] object to the result of the given `f`. pub fn map_data Result>(self, f: F) -> Result> { f(self.data).map(|data| Transformed::new(data, self.transformed, self.tnr)) } - /// Handling [`TreeNodeRecursion::Continue`] and [`TreeNodeRecursion::Stop`] is - /// straightforward, but [`TreeNodeRecursion::Jump`] can behave differently when we - /// are traversing down or up on a tree. - /// If [`TreeNodeRecursion`] of the node is [`TreeNodeRecursion::Jump`] recursion is - /// stopped with the given `return_if_jump` [`TreeNodeRecursion`] statement. + /// Handling [`TreeNodeRecursion::Continue`] and [`TreeNodeRecursion::Stop`] + /// is straightforward, but [`TreeNodeRecursion::Jump`] can behave differently + /// when we are traversing down or up on a tree. If [`TreeNodeRecursion`] of + /// the node is [`TreeNodeRecursion::Jump`], recursion stops with the given + /// `return_if_jump` value. fn try_transform_node_with Result>>( - self, + mut self, f: F, return_if_jump: TreeNodeRecursion, ) -> Result> { match self.tnr { TreeNodeRecursion::Continue => { - f(self.data).map(|t| t.update_transformed(self.transformed)) + return f(self.data).map(|mut t| { + t.transformed |= self.transformed; + t + }); } - TreeNodeRecursion::Jump => Ok(self.update_tnr(return_if_jump)), - TreeNodeRecursion::Stop => Ok(self), + TreeNodeRecursion::Jump => { + self.tnr = return_if_jump; + } + TreeNodeRecursion::Stop => {} } + Ok(self) } /// If [`TreeNodeRecursion`] of the node is [`TreeNodeRecursion::Continue`] or - /// [`TreeNodeRecursion::Jump`], transformation is applied to the node. Otherwise, it - /// remains as it is. + /// [`TreeNodeRecursion::Jump`], transformation is applied to the node. + /// Otherwise, it remains as it is. pub fn try_transform_node Result>>( self, f: F, ) -> Result> { - match self.tnr { - TreeNodeRecursion::Continue => {} - TreeNodeRecursion::Jump => {} - TreeNodeRecursion::Stop => return Ok(self), - }; - f(self.data).map(|t| t.update_transformed(self.transformed)) + if self.tnr == TreeNodeRecursion::Stop { + Ok(self) + } else { + f(self.data).map(|mut t| { + t.transformed |= self.transformed; + t + }) + } } } /// Transformation helper to process tree nodes that are siblings. pub trait TransformedIterator: Iterator { - fn map_until_stop_and_collect(self, f: F) -> Result>> - where - F: FnMut(Self::Item) -> Result>; + fn map_until_stop_and_collect< + F: FnMut(Self::Item) -> Result>, + >( + self, + f: F, + ) -> Result>>; } impl TransformedIterator for I { - fn map_until_stop_and_collect( + fn map_until_stop_and_collect< + F: FnMut(Self::Item) -> Result>, + >( self, mut f: F, - ) -> Result>> - where - F: FnMut(Self::Item) -> Result>, - { - let mut new_tnr = TreeNodeRecursion::Continue; - let mut new_transformed = false; - let new_data = self - .map(|i| { - Ok(match new_tnr { - TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => { - let Transformed { - data, - transformed, - tnr, - } = f(i)?; - new_tnr = tnr; - new_transformed |= transformed; - data - } - TreeNodeRecursion::Stop => i, - }) + ) -> Result>> { + let mut tnr = TreeNodeRecursion::Continue; + let mut transformed = false; + let data = self + .map(|item| match tnr { + TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => { + f(item).map(|result| { + tnr = result.tnr; + transformed |= result.transformed; + result.data + }) + } + TreeNodeRecursion::Stop => Ok(item), }) .collect::>>()?; - Ok(Transformed::new(new_data, new_transformed, new_tnr)) + Ok(Transformed::new(data, transformed, tnr)) } } @@ -652,15 +590,14 @@ impl TransformedResult for Result> { } } -/// Helper trait for implementing [`TreeNode`] that have children stored as Arc's -/// -/// If some trait object, such as `dyn T`, implements this trait, -/// its related `Arc` will automatically implement [`TreeNode`] +/// Helper trait for implementing [`TreeNode`] that have children stored as +/// `Arc`s. If some trait object, such as `dyn T`, implements this trait, +/// its related `Arc` will automatically implement [`TreeNode`]. pub trait DynTreeNode { - /// Returns all children of the specified TreeNode + /// Returns all children of the specified `TreeNode`. fn arc_children(&self) -> Vec>; - /// construct a new self with the specified children + /// Constructs a new node with the specified children. fn with_new_arc_children( &self, arc_self: Arc, @@ -668,14 +605,13 @@ pub trait DynTreeNode { ) -> Result>; } -/// Blanket implementation for Arc for any tye that implements -/// [`DynTreeNode`] (such as [`Arc`]) +/// Blanket implementation for any `Arc` where `T` implements [`DynTreeNode`] +/// (such as [`Arc`]). impl TreeNode for Arc { - /// Apply the closure `F` to the node's children - fn apply_children(&self, f: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { + fn apply_children Result>( + &self, + f: &mut F, + ) -> Result { let mut tnr = TreeNodeRecursion::Continue; for child in self.arc_children() { tnr = f(&child)?; @@ -684,10 +620,10 @@ impl TreeNode for Arc { Ok(tnr) } - fn map_children(self, f: F) -> Result> - where - F: FnMut(Self) -> Result>, - { + fn map_children Result>>( + self, + f: F, + ) -> Result> { let children = self.arc_children(); if !children.is_empty() { let new_children = children.into_iter().map_until_stop_and_collect(f)?; @@ -722,11 +658,10 @@ pub trait ConcreteTreeNode: Sized { } impl TreeNode for T { - /// Apply the closure `F` to the node's children - fn apply_children(&self, f: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { + fn apply_children Result>( + &self, + f: &mut F, + ) -> Result { let mut tnr = TreeNodeRecursion::Continue; for child in self.children() { tnr = f(child)?; @@ -735,10 +670,10 @@ impl TreeNode for T { Ok(tnr) } - fn map_children(self, f: F) -> Result> - where - F: FnMut(Self) -> Result>, - { + fn map_children Result>>( + self, + f: F, + ) -> Result> { let (new_self, children) = self.take_children(); if !children.is_empty() { let new_children = children.into_iter().map_until_stop_and_collect(f)?; @@ -753,12 +688,13 @@ impl TreeNode for T { #[cfg(test)] mod tests { + use std::fmt::Display; + use crate::tree_node::{ Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, }; use crate::Result; - use std::fmt::Display; #[derive(PartialEq, Debug)] struct TestTreeNode { diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 78955104c72a..eef25792d00a 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -19,29 +19,27 @@ use std::sync::Arc; -use arrow::compute::{and, cast, prep_null_mask_filter}; +use super::PartitionedFile; +use crate::datasource::listing::ListingTableUrl; +use crate::execution::context::SessionState; +use crate::{error::Result, scalar::ScalarValue}; + use arrow::{ - array::{ArrayRef, StringBuilder}, + array::{Array, ArrayRef, AsArray, StringBuilder}, + compute::{and, cast, prep_null_mask_filter}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; -use arrow_array::cast::AsArray; -use arrow_array::Array; use arrow_schema::Fields; -use futures::stream::FuturesUnordered; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use log::{debug, trace}; - -use crate::{error::Result, scalar::ScalarValue}; - -use super::PartitionedFile; -use crate::datasource::listing::ListingTableUrl; -use crate::execution::context::SessionState; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError}; use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility}; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::execution_props::ExecutionProps; + +use futures::stream::{BoxStream, FuturesUnordered}; +use futures::{StreamExt, TryStreamExt}; +use log::{debug, trace}; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 06687dde6baf..c0e37a7150d9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -15,28 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::collections::BTreeSet; +use std::sync::Arc; + +use super::ParquetFileMetrics; +use crate::physical_plan::metrics; + use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::utils::reassign_predicate_columns; -use std::collections::BTreeSet; - use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; + use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; -use std::sync::Arc; - -use crate::physical_plan::metrics; - -use super::ParquetFileMetrics; /// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which /// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`. @@ -190,8 +190,7 @@ impl<'a> FilterCandidateBuilder<'a> { mut self, metadata: &ParquetMetaData, ) -> Result> { - let expr = self.expr.clone(); - let expr = expr.rewrite(&mut self)?.data; + let expr = self.expr.clone().rewrite(&mut self).data()?; if self.non_primitive_columns || self.projected_columns { Ok(None) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 01213ed8df1a..7c0082037da0 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -18,8 +18,10 @@ //! CoalesceBatches optimizer that groups batches together rows //! in bigger batches to avoid overhead with small batches -use crate::config::ConfigOptions; +use std::sync::Arc; + use crate::{ + config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, physical_plan::{ @@ -27,8 +29,8 @@ use crate::{ repartition::RepartitionExec, Partitioning, }, }; + use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use std::sync::Arc; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 8b6610e871f7..822cd0541ae2 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -45,7 +45,7 @@ use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning}; use arrow::compute::SortOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; @@ -198,15 +198,15 @@ impl PhysicalOptimizerRule for EnforceDistribution { // Run a top-down process to adjust input key ordering recursively let plan_requirements = PlanWithKeyRequirements::new_default(plan); let adjusted = plan_requirements - .transform_down(&adjust_input_keys_ordering)? - .data; + .transform_down(&adjust_input_keys_ordering) + .data()?; adjusted.plan } else { // Run a bottom-up process plan.transform_up(&|plan| { Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) - })? - .data + }) + .data()? }; let distribution_context = DistributionContext::new_default(adjusted); @@ -214,8 +214,8 @@ impl PhysicalOptimizerRule for EnforceDistribution { let distribution_context = distribution_context .transform_up(&|distribution_context| { ensure_distribution(distribution_context, config) - })? - .data; + }) + .data()?; Ok(distribution_context.plan) } @@ -1788,7 +1788,8 @@ pub(crate) mod tests { let plan_requirements = PlanWithKeyRequirements::new_default($PLAN.clone()); let adjusted = plan_requirements - .transform_down(&adjust_input_keys_ordering).data() + .transform_down(&adjust_input_keys_ordering) + .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. adjusted.plan @@ -1796,14 +1797,16 @@ pub(crate) mod tests { // Run reorder_join_keys_to_inputs rule $PLAN.clone().transform_up(&|plan| { Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) - })?.data + }) + .data()? }; // Then run ensure_distribution rule DistributionContext::new_default(adjusted) .transform_up(&|distribution_context| { ensure_distribution(distribution_context, &config) - }).data() + }) + .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index d31bc9c452b9..79dd5758cc2f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -165,8 +165,8 @@ impl PhysicalOptimizerRule for EnforceSorting { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions - .transform_up(¶llelize_sorts)? - .data; + .transform_up(¶llelize_sorts) + .data()?; parallel.plan } else { adjusted.plan @@ -181,8 +181,8 @@ impl PhysicalOptimizerRule for EnforceSorting { true, config, ) - })? - .data; + }) + .data()?; // Execute a top-down traversal to exploit sort push-down opportunities // missed by the bottom-up traversal: @@ -687,7 +687,8 @@ mod tests { { let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone()); let adjusted = plan_requirements - .transform_up(&ensure_sorting).data() + .transform_up(&ensure_sorting) + .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. @@ -695,7 +696,8 @@ mod tests { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions - .transform_up(¶llelize_sorts).data() + .transform_up(¶llelize_sorts) + .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. parallel.plan @@ -712,14 +714,16 @@ mod tests { true, state.config_options(), ) - }).data() + }) + .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); assign_initial_requirements(&mut sort_pushdown); sort_pushdown - .transform_down(&pushdown_sorts).data() + .transform_down(&pushdown_sorts) + .data() .and_then(check_integrity)?; // TODO: End state payloads will be checked here. } diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index cd710ce46990..47ca2028fd86 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -237,8 +237,8 @@ impl PhysicalOptimizerRule for JoinSelection { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(&|p| apply_subrules(p, &subrules, config))? - .data; + .transform_up(&|p| apply_subrules(p, &subrules, config)) + .data()?; // Next, we apply another subrule that tries to optimize joins using any // statistics their inputs might have. // - For a hash join with partition mode [`PartitionMode::Auto`], we will @@ -813,8 +813,8 @@ mod tests_statistical { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new()))? - .data; + .transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new())) + .data()?; // TODO: End state payloads will be checked here. let config = ConfigOptions::new().optimizer; let collect_left_threshold = config.hash_join_single_partition_threshold; diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 8b14bf067d3c..9509d4e4c828 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -162,7 +162,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { plan: Arc, config: &ConfigOptions, ) -> Result> { - let plan = if config.optimizer.enable_distinct_aggregation_soft_limit { + if config.optimizer.enable_distinct_aggregation_soft_limit { plan.transform_down(&|plan| { Ok( if let Some(plan) = @@ -173,12 +173,11 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { Transformed::no(plan) }, ) - })? - .data + }) + .data() } else { - plan - }; - Ok(plan) + Ok(plan) + } } fn name(&self) -> &str { diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 5fe0d46b8043..17d30a2b4ec1 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -112,9 +112,9 @@ pub fn remove_unnecessary_projections( let maybe_unified = try_unifying_projections(projection, child_projection)?; return if let Some(new_plan) = maybe_unified { // To unify 3 or more sequential projections: - Ok(Transformed::yes( - remove_unnecessary_projections(new_plan)?.data, - )) + remove_unnecessary_projections(new_plan) + .data() + .map(Transformed::yes) } else { Ok(Transformed::no(plan)) }; diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index b3dd8294d507..05d2d852e057 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -29,21 +29,22 @@ use crate::{ logical_expr::Operator, physical_plan::{ColumnarValue, PhysicalExpr}, }; -use arrow::record_batch::RecordBatchOptions; + use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, - record_batch::RecordBatch, + record_batch::{RecordBatch, RecordBatchOptions}, }; use arrow_array::cast::AsArray; use datafusion_common::tree_node::TransformedResult; use datafusion_common::{ - internal_err, plan_err, + internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, + ScalarValue, }; -use datafusion_common::{plan_datafusion_err, ScalarValue}; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; + use log::trace; /// A source of runtime statistical information to [`PruningPredicate`]s. diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index e85243b30ba1..c47e5e25d143 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -140,7 +140,7 @@ impl PhysicalOptimizerRule for TopKAggregation { plan: Arc, config: &ConfigOptions, ) -> Result> { - let plan = if config.optimizer.enable_topk_aggregation { + if config.optimizer.enable_topk_aggregation { plan.transform_down(&|plan| { Ok( if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) { @@ -149,12 +149,11 @@ impl PhysicalOptimizerRule for TopKAggregation { Transformed::no(plan) }, ) - })? - .data + }) + .data() } else { - plan - }; - Ok(plan) + Ok(plan) + } } fn name(&self) -> &str { diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index c24db11763ea..59905d859dc8 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -17,19 +17,12 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, Int64Array}; +use arrow::array::{Array, ArrayRef, AsArray, Int64Array}; use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::DataType; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; -use arrow_array::Array; -use hashbrown::HashMap; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; -use tokio::task::JoinSet; - use datafusion::common::Result; use datafusion::datasource::MemTable; use datafusion::physical_plan::aggregates::{ @@ -44,6 +37,11 @@ use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr}; use datafusion_physical_plan::InputOrderMode; use test_utils::{add_empty_batches, StringBatchGenerator}; +use hashbrown::HashMap; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::task::JoinSet; + /// Tests that streaming aggregate and batch (non streaming) aggregate produce /// same results #[tokio::test(flavor = "multi_thread")] @@ -316,6 +314,7 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) { impl TreeNodeVisitor for Visitor { type Node = Arc; + fn f_down(&mut self, node: &Self::Node) -> Result { if let Some(exec) = node.as_any().downcast_ref::() { if self.expected_sort { diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 06b276fb41fd..68b123ab1f28 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -17,28 +17,27 @@ //! Logical Expressions: [`Expr`] +use std::collections::HashSet; +use std::fmt::{self, Display, Formatter, Write}; +use std::hash::Hash; +use std::str::FromStr; +use std::sync::Arc; + use crate::expr_fn::binary_expr; use crate::logical_plan::Subquery; use crate::utils::{expr_to_columns, find_out_reference_exprs}; use crate::window_frame; +use crate::{ + aggregate_function, built_in_function, built_in_window_function, udaf, + BuiltinScalarFunction, ExprSchemable, Operator, Signature, +}; -use crate::Operator; -use crate::{aggregate_function, ExprSchemable}; -use crate::{built_in_function, BuiltinScalarFunction}; -use crate::{built_in_window_function, udaf}; use arrow::datatypes::DataType; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_err, DFSchema, OwnedTableReference}; -use datafusion_common::{plan_err, Column, Result, ScalarValue}; +use datafusion_common::{ + internal_err, plan_err, Column, DFSchema, OwnedTableReference, Result, ScalarValue, +}; use sqlparser::ast::NullTreatment; -use std::collections::HashSet; -use std::fmt; -use std::fmt::{Display, Formatter, Write}; -use std::hash::Hash; -use std::str::FromStr; -use std::sync::Arc; - -use crate::Signature; /// `Expr` is a central struct of DataFusion's query API, and /// represent logical expressions such as `A + 1`, or `CAST(c1 AS diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 8d7a314a89fe..cd9a8344dec4 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -17,17 +17,18 @@ //! Expression rewriter +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + use crate::expr::{Alias, Unnest}; use crate::logical_plan::Projection; use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; + use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRewriter, }; -use datafusion_common::Result; -use datafusion_common::{Column, DFSchema}; -use std::collections::HashMap; -use std::collections::HashSet; -use std::sync::Arc; +use datafusion_common::{Column, DFSchema, Result}; mod order_by; pub use order_by::rewrite_sort_cols_by_aggs; @@ -277,13 +278,15 @@ where #[cfg(test)] mod test { + use std::ops::Add; + use super::*; use crate::expr::Sort; use crate::{col, lit, Cast}; + use arrow::datatypes::DataType; use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter}; use datafusion_common::{DFField, DFSchema, ScalarValue}; - use std::ops::Add; #[derive(Default)] struct RecordingRewriter { diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index 06d1dc061168..b1bc11a83f90 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -20,6 +20,7 @@ use crate::expr::{Alias, Sort}; use crate::expr_rewriter::normalize_col; use crate::{Cast, Expr, ExprSchemable, LogicalPlan, TryCast}; + use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Column, Result}; diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 389a33612d4c..e0cb44626e24 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -16,12 +16,14 @@ // under the License. //! This module provides logic for displaying LogicalPlans in various styles +use std::fmt; + use crate::LogicalPlan; + use arrow::datatypes::Schema; use datafusion_common::display::GraphvizBuilder; use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::DataFusionError; -use std::fmt; /// Formats plans with a single line per node. For example: /// diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 825d3f037023..ca021c4bfc28 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -664,8 +664,8 @@ impl LogicalPlan { )), _ => Ok(Transformed::no(expr)), } - })? - .data; + }) + .data()?; Filter::try_new(predicate, Arc::new(inputs.swap_remove(0))) .map(LogicalPlan::Filter) diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 41192a6f29b7..67d48f986f13 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -27,7 +27,7 @@ use crate::{Expr, GetFieldAccess}; use datafusion_common::tree_node::{ Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{handle_visit_recursion_down, internal_err, Result}; +use datafusion_common::{handle_visit_recursion, internal_err, Result}; impl TreeNode for Expr { fn apply_children Result>( @@ -134,7 +134,7 @@ impl TreeNode for Expr { let mut tnr = TreeNodeRecursion::Continue; for child in children { tnr = f(child)?; - handle_visit_recursion_down!(tnr); + handle_visit_recursion!(tnr, DOWN); } Ok(tnr) diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index 6b2b9d055c81..02d5d1851289 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -22,7 +22,7 @@ use crate::LogicalPlan; use datafusion_common::tree_node::{ Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, TreeNodeVisitor, }; -use datafusion_common::{handle_visit_recursion_down, handle_visit_recursion_up, Result}; +use datafusion_common::{handle_visit_recursion, Result}; impl TreeNode for LogicalPlan { fn apply Result>( @@ -31,7 +31,7 @@ impl TreeNode for LogicalPlan { ) -> Result { // Compared to the default implementation, we need to invoke // [`Self::apply_subqueries`] before visiting its children - handle_visit_recursion_down!(f(self)?); + handle_visit_recursion!(f(self)?, DOWN); self.apply_subqueries(f)?; self.apply_children(&mut |n| n.apply(f)) } @@ -65,8 +65,9 @@ impl TreeNode for LogicalPlan { match visitor.f_down(self)? { TreeNodeRecursion::Continue => { self.visit_subqueries(visitor)?; - handle_visit_recursion_up!( - self.apply_children(&mut |n| n.visit(visitor))? + handle_visit_recursion!( + self.apply_children(&mut |n| n.visit(visitor))?, + UP ); visitor.f_up(self) } @@ -85,7 +86,7 @@ impl TreeNode for LogicalPlan { let mut tnr = TreeNodeRecursion::Continue; for child in self.inputs() { tnr = f(child)?; - handle_visit_recursion_down!(tnr) + handle_visit_recursion!(tnr, DOWN) } Ok(tnr) } @@ -94,8 +95,8 @@ impl TreeNode for LogicalPlan { where F: FnMut(Self) -> Result>, { - let old_children = self.inputs(); - let new_children = old_children + let new_children = self + .inputs() .iter() .map(|&c| c.clone()) .map_until_stop_and_collect(f)?; diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index 99e32c0bac74..93b24d71c496 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::analyzer::AnalyzerRule; + use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRewriter, @@ -29,7 +32,6 @@ use datafusion_expr::{ aggregate_function, expr, lit, Aggregate, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Projection, Sort, Subquery, }; -use std::sync::Arc; /// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`. /// diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index ada7dca45759..b21ec851dfcd 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -20,11 +20,11 @@ use std::sync::Arc; use crate::analyzer::AnalyzerRule; + use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; -use datafusion_expr::expr::Exists; -use datafusion_expr::expr::InSubquery; +use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::{ logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan, }; @@ -51,7 +51,7 @@ impl AnalyzerRule for InlineTableScan { } fn analyze_internal(plan: LogicalPlan) -> Result> { - Ok(match plan { + match plan { // Match only on scans without filter / projection / fetch // Views and DataFrames won't have those added // during the early stage of planning @@ -64,31 +64,29 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { }) if filters.is_empty() && source.get_logical_plan().is_some() => { let sub_plan = source.get_logical_plan().unwrap(); let projection_exprs = generate_projection_expr(&projection, sub_plan)?; - let plan = LogicalPlanBuilder::from(sub_plan.clone()) + LogicalPlanBuilder::from(sub_plan.clone()) .project(projection_exprs)? // Ensures that the reference to the inlined table remains the // same, meaning we don't have to change any of the parent nodes // that reference this table. .alias(table_name)? - .build()?; - Transformed::yes(plan) + .build() + .map(Transformed::yes) } LogicalPlan::Filter(filter) => { - let new_expr = filter.predicate.transform(&rewrite_subquery)?.data; - Transformed::yes(LogicalPlan::Filter(Filter::try_new( - new_expr, - filter.input, - )?)) + let new_expr = filter.predicate.transform(&rewrite_subquery).data()?; + Filter::try_new(new_expr, filter.input) + .map(|e| Transformed::yes(LogicalPlan::Filter(e))) } - _ => Transformed::no(plan), - }) + _ => Ok(Transformed::no(plan)), + } } fn rewrite_subquery(expr: Expr) -> Result> { match expr { Expr::Exists(Exists { subquery, negated }) => { let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal)?.data; + let new_plan = plan.transform_up(&analyze_internal).data()?; let subquery = subquery.with_plan(Arc::new(new_plan)); Ok(Transformed::yes(Expr::Exists(Exists { subquery, negated }))) } @@ -98,7 +96,7 @@ fn rewrite_subquery(expr: Expr) -> Result> { negated, }) => { let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal)?.data; + let new_plan = plan.transform_up(&analyze_internal).data()?; let subquery = subquery.with_plan(Arc::new(new_plan)); Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( expr, subquery, negated, @@ -106,7 +104,7 @@ fn rewrite_subquery(expr: Expr) -> Result> { } Expr::ScalarSubquery(subquery) => { let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal)?.data; + let new_plan = plan.transform_up(&analyze_internal).data()?; let subquery = subquery.with_plan(Arc::new(new_plan)); Ok(Transformed::yes(Expr::ScalarSubquery(subquery))) } @@ -135,13 +133,12 @@ fn generate_projection_expr( mod tests { use std::{sync::Arc, vec}; - use arrow::datatypes::{DataType, Field, Schema}; - - use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; - use crate::analyzer::inline_table_scan::InlineTableScan; use crate::test::assert_analyzed_plan_eq; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; + pub struct RawTableSource {} impl TableSource for RawTableSource { diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/optimizer/src/analyzer/rewrite_expr.rs index 7bf852f8891c..41ebcd8e501a 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs @@ -19,21 +19,19 @@ use std::sync::Arc; +use super::AnalyzerRule; + use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion_common::utils::list_ndims; -use datafusion_common::DFSchema; -use datafusion_common::DFSchemaRef; -use datafusion_common::Result; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::utils::merge_schema; -use datafusion_expr::BuiltinScalarFunction; -use datafusion_expr::Operator; -use datafusion_expr::ScalarFunctionDefinition; -use datafusion_expr::{BinaryExpr, Expr, LogicalPlan}; - -use super::AnalyzerRule; +use datafusion_expr::{ + BinaryExpr, BuiltinScalarFunction, Expr, LogicalPlan, Operator, + ScalarFunctionDefinition, +}; #[derive(Default)] pub struct OperatorToFunction {} @@ -97,38 +95,31 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { type Node = Expr; fn f_up(&mut self, expr: Expr) -> Result> { - Ok(match expr { - Expr::BinaryExpr(BinaryExpr { - ref left, + if let Expr::BinaryExpr(BinaryExpr { + ref left, + op, + ref right, + }) = expr + { + if let Some(fun) = rewrite_array_concat_operator_to_func_for_column( + left.as_ref(), op, - ref right, - }) => { - if let Some(fun) = rewrite_array_concat_operator_to_func_for_column( - left.as_ref(), - op, - right.as_ref(), - self.schema.as_ref(), - )? - .or_else(|| { - rewrite_array_concat_operator_to_func( - left.as_ref(), - op, - right.as_ref(), - ) - }) { - // Convert &Box -> Expr - let left = (**left).clone(); - let right = (**right).clone(); - return Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::BuiltIn(fun), - args: vec![left, right], - }))); - } - - Transformed::no(expr) + right.as_ref(), + self.schema.as_ref(), + )? + .or_else(|| { + rewrite_array_concat_operator_to_func(left.as_ref(), op, right.as_ref()) + }) { + // Convert &Box -> Expr + let left = (**left).clone(); + let right = (**right).clone(); + return Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction { + func_def: ScalarFunctionDefinition::BuiltIn(fun), + args: vec![left, right], + }))); } - _ => Transformed::no(expr), - }) + } + Ok(Transformed::no(expr)) } } diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 3aab62438fe5..b7f513727d39 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Deref; + use crate::analyzer::check_plan; use crate::utils::collect_subquery_cols; + use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{plan_err, Result}; use datafusion_expr::expr_rewriter::strip_outer_reference; @@ -25,7 +28,6 @@ use datafusion_expr::{ Aggregate, BinaryExpr, Cast, Expr, Filter, Join, JoinType, LogicalPlan, Operator, Window, }; -use std::ops::Deref; /// Do necessary check on subquery expressions and fail the invalid plan /// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions, @@ -287,10 +289,9 @@ fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result> { .into_iter() .partition(|e| e.contains_outer()); - correlated - .into_iter() - .for_each(|expr| exprs.push(strip_outer_reference(expr.clone()))); - return Ok(TreeNodeRecursion::Continue); + for expr in correlated { + exprs.push(strip_outer_reference(expr.clone())); + } } Ok(TreeNodeRecursion::Continue) })?; diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 8c208fab9aa6..08f49ed15b09 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -19,8 +19,9 @@ use std::sync::Arc; -use arrow::datatypes::{DataType, IntervalUnit}; +use crate::analyzer::AnalyzerRule; +use arrow::datatypes::{DataType, IntervalUnit}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion_common::{ @@ -50,8 +51,6 @@ use datafusion_expr::{ WindowFrameBound, WindowFrameUnits, }; -use crate::analyzer::AnalyzerRule; - #[derive(Default)] pub struct TypeCoercion {} @@ -753,31 +752,26 @@ mod test { use std::any::Any; use std::sync::{Arc, OnceLock}; - use arrow::array::{FixedSizeListArray, Int32Array}; - use arrow::datatypes::{DataType, TimeUnit}; + use crate::analyzer::type_coercion::{ + cast_expr, coerce_case_expression, TypeCoercion, TypeCoercionRewriter, + }; + use crate::test::assert_analyzed_plan_eq; - use arrow::datatypes::Field; - use datafusion_common::tree_node::TreeNode; + use arrow::array::{FixedSizeListArray, Int32Array}; + use arrow::datatypes::{DataType, Field, TimeUnit}; + use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::{DFField, DFSchema, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction}; + use datafusion_expr::logical_plan::{EmptyRelation, Projection}; use datafusion_expr::{ - cast, col, concat, concat_ws, create_udaf, is_true, AccumulatorFactoryFunction, - AggregateFunction, AggregateUDF, BinaryExpr, BuiltinScalarFunction, Case, - ColumnarValue, ExprSchemable, Filter, Operator, ScalarUDFImpl, - SimpleAggregateUDF, Subquery, - }; - use datafusion_expr::{ - lit, - logical_plan::{EmptyRelation, Projection}, - Expr, LogicalPlan, ScalarUDF, Signature, Volatility, + cast, col, concat, concat_ws, create_udaf, is_true, lit, + AccumulatorFactoryFunction, AggregateFunction, AggregateUDF, BinaryExpr, + BuiltinScalarFunction, Case, ColumnarValue, Expr, ExprSchemable, Filter, + LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl, Signature, SimpleAggregateUDF, + Subquery, Volatility, }; use datafusion_physical_expr::expressions::AvgAccumulator; - use crate::analyzer::type_coercion::{ - cast_expr, coerce_case_expression, TypeCoercion, TypeCoercionRewriter, - }; - use crate::test::assert_analyzed_plan_eq; - fn empty() -> Arc { Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -1278,7 +1272,7 @@ mod test { std::collections::HashMap::new(), )?); let mut rewriter = TypeCoercionRewriter { schema }; - let result = expr.rewrite(&mut rewriter)?.data; + let result = expr.rewrite(&mut rewriter).data()?; let schema = Arc::new(DFSchema::new_with_metadata( vec![DFField::new_unqualified( @@ -1313,7 +1307,7 @@ mod test { let mut rewriter = TypeCoercionRewriter { schema }; let expr = is_true(lit(12i32).gt(lit(13i64))); let expected = is_true(cast(lit(12i32), DataType::Int64).gt(lit(13i64))); - let result = expr.rewrite(&mut rewriter)?.data; + let result = expr.rewrite(&mut rewriter).data()?; assert_eq!(expected, result); // eq @@ -1324,7 +1318,7 @@ mod test { let mut rewriter = TypeCoercionRewriter { schema }; let expr = is_true(lit(12i32).eq(lit(13i64))); let expected = is_true(cast(lit(12i32), DataType::Int64).eq(lit(13i64))); - let result = expr.rewrite(&mut rewriter)?.data; + let result = expr.rewrite(&mut rewriter).data()?; assert_eq!(expected, result); // lt @@ -1335,7 +1329,7 @@ mod test { let mut rewriter = TypeCoercionRewriter { schema }; let expr = is_true(lit(12i32).lt(lit(13i64))); let expected = is_true(cast(lit(12i32), DataType::Int64).lt(lit(13i64))); - let result = expr.rewrite(&mut rewriter)?.data; + let result = expr.rewrite(&mut rewriter).data()?; assert_eq!(expected, result); Ok(()) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 323556ad7158..30c184a28e33 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -29,7 +29,8 @@ use datafusion_common::tree_node::{ TreeNodeVisitor, }; use datafusion_common::{ - internal_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, + internal_datafusion_err, internal_err, Column, DFField, DFSchema, DFSchemaRef, + DataFusionError, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::logical_plan::{ @@ -680,12 +681,9 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { fn f_up(&mut self, expr: &Expr) -> Result { self.series_number += 1; - let (idx, sub_expr_desc) = - if let Some((idx, sub_expr_desc)) = self.pop_enter_mark() { - (idx, sub_expr_desc) - } else { - return Ok(TreeNodeRecursion::Continue); - }; + let Some((idx, sub_expr_desc)) = self.pop_enter_mark() else { + return Ok(TreeNodeRecursion::Continue); + }; // skip exprs should not be recognize. if self.expr_mask.ignores(expr) { self.id_array[idx].0 = self.series_number; @@ -787,7 +785,7 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> { self.curr_index += 1; // Skip sub-node of a replaced tree, or without identifier, or is not repeated expr. let expr_set_item = self.expr_set.get(id).ok_or_else(|| { - DataFusionError::Internal("expr_set invalid state".to_string()) + internal_datafusion_err!("expr_set invalid state") })?; if *series_number < self.max_series_number || id.is_empty() diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 09dccc1fc703..fd548ba4948e 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -15,19 +15,20 @@ // specific language governing permissions and limitations // under the License. +use std::collections::{BTreeSet, HashMap}; +use std::ops::Deref; + use crate::simplify_expressions::{ExprSimplifier, SimplifyContext}; use crate::utils::collect_subquery_cols; + use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; -use datafusion_common::{plan_err, Result}; -use datafusion_common::{Column, DFSchemaRef, ScalarValue}; +use datafusion_common::{plan_err, Column, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{AggregateFunctionDefinition, Alias}; use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_physical_expr::execution_props::ExecutionProps; -use std::collections::{BTreeSet, HashMap}; -use std::ops::Deref; /// This struct rewrite the sub query plan by pull up the correlated expressions(contains outer reference columns) from the inner subquery's 'Filter'. /// It adds the inner reference columns to the 'Projection' or 'Aggregate' of the subquery if they are missing, so that they can be evaluated by the parent operator as the join condition. @@ -396,8 +397,8 @@ fn agg_exprs_evaluation_result_on_empty_batch( _ => Transformed::no(expr), }; Ok(new_expr) - })? - .data; + }) + .data()?; let result_expr = result_expr.unalias(); let props = ExecutionProps::new(); @@ -432,8 +433,9 @@ fn proj_exprs_evaluation_result_on_empty_batch( } else { Ok(Transformed::no(expr)) } - })? - .data; + }) + .data()?; + if result_expr.ne(expr) { let props = ExecutionProps::new(); let info = SimplifyContext::new(&props).with_schema(schema.clone()); @@ -468,8 +470,9 @@ fn filter_exprs_evaluation_result_on_empty_batch( } else { Ok(Transformed::no(expr)) } - })? - .data; + }) + .data()?; + let pull_up_expr = if result_expr.ne(filter_expr) { let props = ExecutionProps::new(); let info = SimplifyContext::new(&props).with_schema(schema); diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 013ab83aaa95..b94cf37c5c12 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -15,12 +15,17 @@ // specific language governing permissions and limitations // under the License. +use std::collections::BTreeSet; +use std::ops::Deref; +use std::sync::Arc; + use crate::decorrelate::PullUpCorrelatedExpr; use crate::optimizer::ApplyOrder; use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; + use datafusion_common::alias::AliasGenerator; -use datafusion_common::tree_node::TreeNode; +use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::{plan_err, Result}; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; @@ -30,10 +35,8 @@ use datafusion_expr::{ exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, }; + use log::debug; -use std::collections::BTreeSet; -use std::ops::Deref; -use std::sync::Arc; /// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins #[derive(Default)] @@ -228,7 +231,7 @@ fn build_join( collected_count_expr_map: Default::default(), pull_up_having_expr: None, }; - let new_plan = subquery.clone().rewrite(&mut pull_up)?.data; + let new_plan = subquery.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { return Ok(None); } @@ -321,8 +324,11 @@ impl SubqueryInfo { #[cfg(test)] mod tests { + use std::ops::Add; + use super::*; use crate::test::*; + use arrow::datatypes::DataType; use datafusion_common::Result; use datafusion_expr::{ @@ -330,7 +336,6 @@ mod tests { logical_plan::LogicalPlanBuilder, not_exists, not_in_subquery, or, out_ref_col, Operator, }; - use std::ops::Add; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq_display_indent( diff --git a/datafusion/optimizer/src/plan_signature.rs b/datafusion/optimizer/src/plan_signature.rs index 8b8814192d38..4143d52a053e 100644 --- a/datafusion/optimizer/src/plan_signature.rs +++ b/datafusion/optimizer/src/plan_signature.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, num::NonZeroUsize, }; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_expr::LogicalPlan; /// Non-unique identifier of a [`LogicalPlan`]. diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 4501733b00a6..8acc36e479ca 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -15,21 +15,23 @@ // specific language governing permissions and limitations // under the License. +use std::collections::{BTreeSet, HashMap}; +use std::sync::Arc; + use crate::decorrelate::{PullUpCorrelatedExpr, UN_MATCHED_ROW_INDICATOR}; use crate::optimizer::ApplyOrder; use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; + use datafusion_common::alias::AliasGenerator; use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; use datafusion_common::{plan_err, Column, Result, ScalarValue}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; use datafusion_expr::utils::conjunction; use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder}; -use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; /// Optimizer rule for rewriting subquery filters to joins #[derive(Default)] @@ -56,8 +58,11 @@ impl ScalarSubqueryToJoin { sub_query_info: vec![], alias_gen, }; - let new_expr = predicate.clone().rewrite(&mut extract)?.data; - Ok((extract.sub_query_info, new_expr)) + predicate + .clone() + .rewrite(&mut extract) + .data() + .map(|new_expr| (extract.sub_query_info, new_expr)) } } @@ -100,8 +105,8 @@ impl OptimizerRule for ScalarSubqueryToJoin { } else { Ok(Transformed::no(expr)) } - })? - .data; + }) + .data()?; } cur_input = optimized_subquery; } else { @@ -157,8 +162,8 @@ impl OptimizerRule for ScalarSubqueryToJoin { } else { Ok(Transformed::no(expr)) } - })? - .data; + }) + .data()?; expr_to_rewrite_expr_map.insert(expr, new_expr); } } @@ -283,7 +288,7 @@ fn build_join( collected_count_expr_map: Default::default(), pull_up_having_expr: None, }; - let new_plan = subquery_plan.clone().rewrite(&mut pull_up)?.data; + let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { return Ok(None); } @@ -372,15 +377,17 @@ fn build_join( #[cfg(test)] mod tests { + use std::ops::Add; + use super::*; use crate::test::*; + use arrow::datatypes::DataType; use datafusion_common::Result; + use datafusion_expr::logical_plan::LogicalPlanBuilder; use datafusion_expr::{ - col, lit, logical_plan::LogicalPlanBuilder, max, min, out_ref_col, - scalar_subquery, sum, Between, + col, lit, max, min, out_ref_col, scalar_subquery, sum, Between, }; - use std::ops::Add; /// Test multiple correlated subqueries #[test] diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 9b0224cafd2a..6b5dd1b4681e 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -19,16 +19,21 @@ use std::ops::Not; +use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; +use super::utils::*; +use crate::analyzer::type_coercion::TypeCoercionRewriter; +use crate::simplify_expressions::guarantees::GuaranteeRewriter; +use crate::simplify_expressions::regex::simplify_regex_expr; +use crate::simplify_expressions::SimplifyInfo; + use arrow::{ array::{new_null_array, AsArray}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; - -use datafusion_common::tree_node::{Transformed, TransformedResult}; use datafusion_common::{ cast::{as_large_list_array, as_list_array}, - tree_node::{TreeNode, TreeNodeRewriter}, + tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, }; use datafusion_common::{ internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, @@ -40,14 +45,6 @@ use datafusion_expr::{ use datafusion_expr::{expr::ScalarFunction, interval_arithmetic::NullableInterval}; use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps}; -use crate::analyzer::type_coercion::TypeCoercionRewriter; -use crate::simplify_expressions::guarantees::GuaranteeRewriter; -use crate::simplify_expressions::regex::simplify_regex_expr; -use crate::simplify_expressions::SimplifyInfo; - -use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; -use super::utils::*; - /// This structure handles API for expression simplification pub struct ExprSimplifier { info: S, @@ -132,36 +129,34 @@ impl ExprSimplifier { /// let expr = simplifier.simplify(expr).unwrap(); /// assert_eq!(expr, b_lt_2); /// ``` - pub fn simplify(&self, expr: Expr) -> Result { + pub fn simplify(&self, mut expr: Expr) -> Result { let mut simplifier = Simplifier::new(&self.info); let mut const_evaluator = ConstEvaluator::try_new(self.info.execution_props())?; let mut shorten_in_list_simplifier = ShortenInListSimplifier::new(); let mut inlist_simplifier = InListSimplifier::new(); let mut guarantee_rewriter = GuaranteeRewriter::new(&self.guarantees); - let expr = if self.canonicalize { - expr.rewrite(&mut Canonicalizer::new())?.data - } else { - expr - }; + if self.canonicalize { + expr = expr.rewrite(&mut Canonicalizer::new()).data()? + } // TODO iterate until no changes are made during rewrite // (evaluating constants can enable new simplifications and // simplifications can enable new constant evaluation) // https://github.com/apache/arrow-datafusion/issues/1160 - expr.rewrite(&mut const_evaluator)? - .data - .rewrite(&mut simplifier)? - .data - .rewrite(&mut inlist_simplifier)? - .data - .rewrite(&mut shorten_in_list_simplifier)? - .data - .rewrite(&mut guarantee_rewriter)? - .data + expr.rewrite(&mut const_evaluator) + .data()? + .rewrite(&mut simplifier) + .data()? + .rewrite(&mut inlist_simplifier) + .data()? + .rewrite(&mut shorten_in_list_simplifier) + .data()? + .rewrite(&mut guarantee_rewriter) + .data()? // run both passes twice to try an minimize simplifications that we missed - .rewrite(&mut const_evaluator)? - .data + .rewrite(&mut const_evaluator) + .data()? .rewrite(&mut simplifier) .data() } @@ -1372,16 +1367,15 @@ mod tests { sync::Arc, }; + use super::*; + use crate::simplify_expressions::SimplifyContext; + use crate::test::test_table_scan_with_name; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{assert_contains, DFField, ToDFSchema}; use datafusion_expr::{interval_arithmetic::Interval, *}; use datafusion_physical_expr::execution_props::ExecutionProps; - use crate::simplify_expressions::SimplifyContext; - use crate::test::test_table_scan_with_name; - - use super::*; - // ------------------------------ // --- ExprSimplifier tests ----- // ------------------------------ diff --git a/datafusion/optimizer/src/simplify_expressions/guarantees.rs b/datafusion/optimizer/src/simplify_expressions/guarantees.rs index 9f8553cb0cc2..6eb583257dcb 100644 --- a/datafusion/optimizer/src/simplify_expressions/guarantees.rs +++ b/datafusion/optimizer/src/simplify_expressions/guarantees.rs @@ -21,8 +21,8 @@ use std::{borrow::Cow, collections::HashMap}; -use datafusion_common::tree_node::Transformed; -use datafusion_common::{tree_node::TreeNodeRewriter, DataFusionError, Result}; +use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::interval_arithmetic::{Interval, NullableInterval}; use datafusion_expr::{expr::InList, lit, Between, BinaryExpr, Expr}; @@ -204,8 +204,8 @@ mod tests { use super::*; use arrow::datatypes::DataType; - use datafusion_common::tree_node::TransformedResult; - use datafusion_common::{tree_node::TreeNode, ScalarValue}; + use datafusion_common::tree_node::{TransformedResult, TreeNode}; + use datafusion_common::ScalarValue; use datafusion_expr::{col, lit, Operator}; #[test] diff --git a/datafusion/optimizer/src/simplify_expressions/inlist_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/inlist_simplifier.rs index 8cbb321c2755..fa1d7cfc1239 100644 --- a/datafusion/optimizer/src/simplify_expressions/inlist_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/inlist_simplifier.rs @@ -17,6 +17,9 @@ //! This module implements a rule that simplifies the values for `InList`s +use super::utils::{is_null, lit_bool_null}; +use super::THRESHOLD_INLINE_INLIST; + use std::borrow::Cow; use std::collections::HashSet; @@ -25,9 +28,6 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_expr::expr::{InList, InSubquery}; use datafusion_expr::{lit, BinaryExpr, Expr, Operator}; -use super::utils::{is_null, lit_bool_null}; -use super::THRESHOLD_INLINE_INLIST; - pub(super) struct ShortenInListSimplifier {} impl ShortenInListSimplifier { diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 9cc34c9b1611..196a35ee9ae8 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -18,8 +18,13 @@ //! Unwrap-cast binary comparison rule can be used to the binary/inlist comparison expr now, and other type //! of expr can be added if needed. //! This rule can reduce adding the `Expr::Cast` the expr instead of adding the `Expr::Cast` to literal expr. + +use std::cmp::Ordering; +use std::sync::Arc; + use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; + use arrow::datatypes::{ DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION, }; @@ -32,8 +37,6 @@ use datafusion_expr::utils::merge_schema; use datafusion_expr::{ binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator, }; -use std::cmp::Ordering; -use std::sync::Arc; /// [`UnwrapCastInComparison`] attempts to remove casts from /// comparisons to literals ([`ScalarValue`]s) by applying the casts @@ -472,15 +475,17 @@ fn cast_between_timestamp(from: DataType, to: DataType, value: i128) -> Option`]s that are known /// to have the same value for all tuples in a relation. These are generated by @@ -479,15 +481,14 @@ impl EquivalenceGroup { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::equivalence::tests::create_test_params; use crate::equivalence::{EquivalenceClass, EquivalenceGroup}; - use crate::expressions::lit; - use crate::expressions::Column; - use crate::expressions::Literal; - use datafusion_common::Result; - use datafusion_common::ScalarValue; - use std::sync::Arc; + use crate::expressions::{lit, Column, Literal}; + + use datafusion_common::{Result, ScalarValue}; #[test] fn test_bridge_groups() -> Result<()> { diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 6b928ea24c6b..46909f23616f 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -15,18 +15,22 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + +use crate::expressions::Column; +use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; + +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; + mod class; mod ordering; mod projection; mod properties; -use crate::expressions::Column; -use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; + pub use class::{EquivalenceClass, EquivalenceGroup}; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use ordering::OrderingEquivalenceClass; pub use projection::ProjectionMapping; pub use properties::{join_equivalence_properties, EquivalenceProperties}; -use std::sync::Arc; /// This function constructs a duplicate-free `LexOrderingReq` by filtering out /// duplicate entries that have same physical expression inside. For example, @@ -62,19 +66,22 @@ pub fn add_offset_to_expr( #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::expressions::{col, Column}; use crate::PhysicalSortExpr; + use arrow::compute::{lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, Float64Array, RecordBatch, UInt32Array}; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::{plan_datafusion_err, Result}; + use itertools::izip; use rand::rngs::StdRng; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; - use std::sync::Arc; pub fn output_schema( mapping: &ProjectionMapping, diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index ad1f754a96d1..c60742c724e4 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -109,6 +109,8 @@ impl ProjectionMapping { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::equivalence::tests::{ apply_projection, convert_to_orderings, convert_to_orderings_owned, @@ -120,12 +122,13 @@ mod tests { use crate::expressions::{col, BinaryExpr, Literal}; use crate::functions::create_physical_expr; use crate::PhysicalSortExpr; + use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SortOptions, TimeUnit}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{BuiltinScalarFunction, Operator}; + use itertools::Itertools; - use std::sync::Arc; #[test] fn project_orderings() -> Result<()> { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 88550813fe23..f234a1fa08cd 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -15,11 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::CastExpr; -use arrow_schema::SchemaRef; -use datafusion_common::{JoinSide, JoinType, Result}; -use indexmap::{IndexMap, IndexSet}; -use itertools::Itertools; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -27,7 +22,7 @@ use super::ordering::collapse_lex_ordering; use crate::equivalence::{ collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::Literal; +use crate::expressions::{CastExpr, Literal}; use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::{ physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, @@ -35,8 +30,12 @@ use crate::{ PhysicalSortRequirement, }; -use arrow_schema::SortOptions; +use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{JoinSide, JoinType, Result}; + +use indexmap::{IndexMap, IndexSet}; +use itertools::Itertools; /// A `EquivalenceProperties` object stores useful information related to a schema. /// Currently, it keeps track of: @@ -1298,10 +1297,12 @@ mod tests { use crate::expressions::{col, BinaryExpr, Column}; use crate::functions::create_physical_expr; use crate::PhysicalSortExpr; + use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, SortOptions, TimeUnit}; use datafusion_common::Result; use datafusion_expr::{BuiltinScalarFunction, Operator}; + use itertools::Itertools; #[test] diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index e6ce8316c27e..609349509b86 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -19,18 +19,18 @@ use std::borrow::Cow; use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::expressions::try_cast; -use crate::expressions::NoOp; +use crate::expressions::{try_cast, NoOp}; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; + use arrow::array::*; use arrow::compute::kernels::cmp::eq; use arrow::compute::kernels::zip::zip; use arrow::compute::{and, is_null, not, nullif, or, prep_null_mask_filter}; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{cast::as_boolean_array, internal_err, DataFusionError, Result}; -use datafusion_common::{exec_err, ScalarValue}; +use datafusion_common::cast::as_boolean_array; +use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; use itertools::Itertools; @@ -414,17 +414,15 @@ pub fn case( #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; - use crate::expressions::lit; - use crate::expressions::{binary, cast}; + use crate::expressions::{binary, cast, col, lit}; + use arrow::array::StringArray; use arrow::buffer::Buffer; use arrow::datatypes::DataType::Float64; use arrow::datatypes::*; use datafusion_common::cast::{as_float64_array, as_int32_array}; - use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; - use datafusion_common::ScalarValue; + use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::Operator; diff --git a/datafusion/physical-expr/src/tree_node.rs b/datafusion/physical-expr/src/tree_node.rs index 0e2aa7d63679..42dc6673af6a 100644 --- a/datafusion/physical-expr/src/tree_node.rs +++ b/datafusion/physical-expr/src/tree_node.rs @@ -63,8 +63,7 @@ impl ExprContext { pub fn update_expr_from_children(mut self) -> Result { let children_expr = self.children.iter().map(|c| c.expr.clone()).collect(); - let t = with_new_children_if_necessary(self.expr, children_expr)?; - self.expr = t; + self.expr = with_new_children_if_necessary(self.expr, children_expr)?; Ok(self) } } diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 0b0dca6bb4b6..b8e99403d695 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -130,10 +130,10 @@ pub fn get_indices_of_exprs_strict>>( pub type ExprTreeNode = ExprContext>; -/// This struct is used to convert a [PhysicalExpr] tree into a DAEG (i.e. an expression +/// This struct is used to convert a [`PhysicalExpr`] tree into a DAEG (i.e. an expression /// DAG) by collecting identical expressions in one node. Caller specifies the node type /// in the DAEG via the `constructor` argument, which constructs nodes in the DAEG from -/// the [ExprTreeNode] ancillary object. +/// the [`ExprTreeNode`] ancillary object. struct PhysicalExprDAEGBuilder<'a, T, F: Fn(&ExprTreeNode) -> Result> { // The resulting DAEG (expression DAG). graph: StableGraph, @@ -196,8 +196,8 @@ where }; // Use the builder to transform the expression tree node into a DAG. let root = init - .transform_up_mut(&mut |node| builder.mutate(node))? - .data; + .transform_up_mut(&mut |node| builder.mutate(node)) + .data()?; // Return a tuple containing the root node index and the DAG. Ok((root.data.unwrap(), builder.graph)) } diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 3484ee45ba6a..9824c723d9d1 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -31,7 +31,7 @@ use arrow::compute::concat_batches; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch}; use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder}; use arrow_schema::{Schema, SchemaRef}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ arrow_datafusion_err, plan_datafusion_err, DataFusionError, JoinSide, Result, ScalarValue, @@ -292,8 +292,8 @@ pub fn convert_sort_expr_with_filter_schema( None => Transformed::no(p), } }) - })? - .data; + }) + .data()?; // Search the converted `PhysicalExpr` in filter expression; if an exact // match is found, use this sorted expression in graph traversals. if check_filter_expr_contains_sort_information( diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 083c2f03be7b..1cb2b100e2d6 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -39,6 +39,7 @@ use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray}; use arrow_buffer::ArrowNativeType; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult, }; @@ -50,7 +51,6 @@ use datafusion_physical_expr::{ LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; @@ -484,8 +484,8 @@ fn replace_on_columns_of_right_ordering( } else { Ok(Transformed::no(e)) } - })? - .data; + }) + .data()?; item.expr = new_expr; } } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 9d4edd6c0de4..a9a7070023ab 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -661,7 +661,7 @@ pub fn with_new_children_if_necessary( .zip(old_children.iter()) .any(|(c1, c2)| !Arc::data_ptr_eq(c1, c2)) { - Ok(plan.with_new_children(children)?) + plan.with_new_children(children) } else { Ok(plan) } diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 6fd88160468c..52a52f81bdaf 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -63,8 +63,7 @@ impl PlanContext { pub fn update_plan_from_children(mut self) -> Result { let children_plans = self.children.iter().map(|c| c.plan.clone()).collect(); - let t = with_new_children_if_necessary(self.plan, children_plans)?; - self.plan = t; + self.plan = with_new_children_if_necessary(self.plan, children_plans)?; Ok(self) } } diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index d6f53a73dcb1..abb896ab113e 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,19 +17,19 @@ //! SQL Utility Functions +use std::collections::HashMap; + use arrow_schema::{ DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use sqlparser::ast::Ident; - -use datafusion_common::{exec_err, internal_err, plan_err}; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + exec_err, internal_err, plan_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::expr::{Alias, GroupingSet, WindowFunction}; -use datafusion_expr::expr_vec_fmt; use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; -use datafusion_expr::{Expr, LogicalPlan}; -use std::collections::HashMap; +use datafusion_expr::{expr_vec_fmt, Expr, LogicalPlan}; +use sqlparser::ast::Ident; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 79877fa421e3..906926a5a9ab 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3214,7 +3214,7 @@ JOIN sales_global AS e ON s.currency = e.currency AND s.ts >= e.ts GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency -ORDER BY s.sn +ORDER BY s.sn, s.zip_code ---- 0 GRC 0 2022-01-01T06:00:00 EUR 30 0 GRC 4 2022-01-03T10:00:00 EUR 80