From ffab86a990b65363ceb8aa5e62a0de1a1362518c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Apr 2024 07:12:59 -0400 Subject: [PATCH] improve dos --- datafusion/common/src/tree_node.rs | 68 ++++++++++++++------- datafusion/expr/src/logical_plan/mutate.rs | 70 ++++++++++------------ 2 files changed, 77 insertions(+), 61 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 54611558c9e7a..b0f7686963f79 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -100,6 +100,10 @@ pub trait TreeNode: Sized { /// Visit the tree node using the given [`TreeNodeVisitor`], performing a /// depth-first walk of the node and its children. /// + /// See also: + /// * [`Self::mutate`] to rewrite `TreeNode`s in place + /// * [`Self::rewrite`] to rewrite owned `TreeNode`s + /// /// Consider the following tree structure: /// ```text /// ParentNode @@ -144,6 +148,10 @@ pub trait TreeNode: Sized { /// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for /// recursively transforming [`TreeNode`]s. /// + /// See also: + /// * [`Self::mutate`] to rewrite `TreeNode`s in place + /// * [`Self::visit`] for inspecting (without modification) `TreeNode`s + /// /// Consider the following tree structure: /// ```text /// ParentNode @@ -175,7 +183,11 @@ pub trait TreeNode: Sized { } /// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for - /// recursively mutating / rewriting [`TreeNode`]s in place + /// recursively mutating / rewriting [`TreeNode`]s in place. + /// + /// See also: + /// * [`Self::rewrite`] to rewrite owned `TreeNode`s + /// * [`Self::visit`] for inspecting (without modification) `TreeNode`s /// /// Consider the following tree structure: /// ```text @@ -184,7 +196,7 @@ pub trait TreeNode: Sized { /// right: ChildNode2 /// ``` /// - /// Here, the nodes would be mutated using the following order: + /// Here, the nodes would be mutataed in the following order: /// ```text /// TreeNodeMutator::f_down(ParentNode) /// TreeNodeMutator::f_down(ChildNode1) @@ -422,13 +434,17 @@ pub trait TreeNode: Sized { /// Rewrite the node's children in place using `F`. /// - /// Using [`Self::map_children`], the owned API, is more ideomatic and - /// has clearer semantics on error (the node is consumed). However, it requires - /// copying the interior fields of the tree node during rewrite + /// On error, `self` is left partially rewritten. + /// + /// # Notes + /// + /// Using [`Self::map_children`], the owned API, has clearer semantics on + /// error (the node is consumed). However, it requires copying the interior + /// fields of the tree node during rewrite. /// /// This API writes the nodes in place, which can be faster as it avoids - /// copying. However, one downside is that the tree node can be left in an - /// partially rewritten state when an error occurs. + /// copying, but leaves the tree node in an partially rewritten state when + /// an error occurs. fn mutate_children Result>>( &mut self, _f: F, @@ -492,30 +508,35 @@ pub trait TreeNodeRewriter: Sized { } } -/// Trait for potentially rewriting tree of [`TreeNode`]s in place +/// Trait for mutating (rewriting in place) [`TreeNode`]s in place /// -/// See [`TreeNodeRewriter`] for rewriting owned tree ndoes -/// See [`TreeNodeVisitor`] for visiting, but not changing, tree nodes +/// # See Also: +/// * [`TreeNodeRewriter`] for rewriting owned `TreeNode`e +/// * [`TreeNodeVisitor`] for visiting, but not changing, `TreeNode`s pub trait TreeNodeMutator: Sized { - /// The node type to rewrite. + /// The node type to mutating. type Node: TreeNode; - /// Invoked while traversing down the tree before any children are rewritten. - /// Default implementation returns the node as is and continues recursion. + /// Invoked while traversing down the tree before any children are mutated. + /// Default implementation does nothing to the node and continues recursion. + /// + /// # Notes /// - /// Since this mutates the nodes in place, the returned Transformed object + /// As the node maybe mutated in place, the returned [`Transformed`] object /// returns `()` (no data). /// /// If the node's children are changed by `f_down`, the *new* children are - /// visited, not the original. + /// visited, not the original children. fn f_down(&mut self, _node: &mut Self::Node) -> Result> { Ok(Transformed::no(())) } - /// Invoked while traversing up the tree after all children have been rewritten. - /// Default implementation returns the node as is and continues recursion. + /// Invoked while traversing up the tree after all children have been mutated. + /// Default implementation does nothing to the node and continues recursion. + /// + /// # Notes /// - /// Since this mutates the nodes in place, the returned Transformed object + /// As the node maybe mutated in place, the returned [`Transformed`] object /// returns `()` (no data). fn f_up(&mut self, _node: &mut Self::Node) -> Result> { Ok(Transformed::no(())) @@ -603,7 +624,7 @@ impl Transformed { /// Invokes f(), depending on the value of self.tnr. /// /// This is used to conditionally apply a function during a f_up tree - /// traversal, if the result of children traversal was `Continue`. + /// traversal, if the result of children traversal was `[`TreeNodeRecursion::Continue`]. /// /// Handling [`TreeNodeRecursion::Continue`] and [`TreeNodeRecursion::Stop`] /// is straightforward, but [`TreeNodeRecursion::Jump`] can behave differently @@ -650,11 +671,12 @@ impl Transformed { impl Transformed<()> { /// Invoke the given function `f` and combine the transformed state with - /// the current state, + /// the current state: + /// + /// * if `f` returns an Err, returns that err /// - /// if f() returns an Err, returns that err - /// If f() returns Ok, returns a true transformed flag if either self or - /// the result of f() was transformed + /// * If `f` returns Ok, sets `self.transformed` to `true` if either self or + /// the result of `f` were transformed. pub fn and_then(self, f: F) -> Result> where F: FnOnce() -> Result>, diff --git a/datafusion/expr/src/logical_plan/mutate.rs b/datafusion/expr/src/logical_plan/mutate.rs index 3c3e5a6e2e098..da2a4d6d6b65e 100644 --- a/datafusion/expr/src/logical_plan/mutate.rs +++ b/datafusion/expr/src/logical_plan/mutate.rs @@ -24,11 +24,11 @@ use datafusion_common::{Column, DFSchema, DFSchemaRef}; use std::sync::{Arc, OnceLock}; impl LogicalPlan { - /// applies the closure `f` to each expression of this node, potentially - /// rewriting it in place + /// applies `f` to each expression of this node, potentially rewriting it in + /// place /// - /// If the closure returns an error, the error is returned and the expressions - /// are left in a partially modified state + /// If `f` returns an error, the error is returned and the expressions are + /// left in a partially modified state pub fn rewrite_exprs(&mut self, mut f: F) -> Result> where F: FnMut(&mut Expr) -> Result>, @@ -66,7 +66,6 @@ impl LogicalPlan { // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. // 2. the second part is non-equijoin(filter). LogicalPlan::Join(Join { on, filter, .. }) => { - // don't look at the equijoin expressions as a whole let exprs = on .iter_mut() .flat_map(|(e1, e2)| std::iter::once(e1).chain(std::iter::once(e2))); @@ -88,10 +87,7 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScan { filters, .. }) => { rewrite_expr_iter_mut(filters.iter_mut(), f) } - LogicalPlan::Unnest(Unnest { column, .. }) => { - // it would be really nice to avoid this and instead have unnest have Expr - rewrite_column(column, f) - } + LogicalPlan::Unnest(Unnest { column, .. }) => rewrite_column(column, f), LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, select_expr, @@ -125,12 +121,14 @@ impl LogicalPlan { } } - /// applies the closure `f` to each input of this node, in place. + /// applies `f` to each input of this node, rewriting them in place. /// - /// Inputs include both direct children as well as any embedded - /// `LogicalPlan`s embedded in `Expr::Exists`, etc. + /// # Notes + /// Inputs include both direct children as well as any embedded subquery + /// `LogicalPlan`s, for example such as are in [`Expr::Exists`]. /// - /// If Err is returned, the inputs may be in a partially modified state + /// If `f` returns an `Err`, that Err is returned, and the inputs are left + /// in a partially modified state pub fn rewrite_inputs(&mut self, mut f: F) -> Result> where F: FnMut(&mut LogicalPlan) -> Result>, @@ -202,7 +200,7 @@ impl LogicalPlan { children_result.and_then(|| self.rewrite_subqueries(&mut f)) } - /// applies the closure `f` to LogicalPlans in any subquery expressions + /// applies `f` to LogicalPlans in any subquery expressions /// /// If Err is returned, the plan may be left in a partially modified state fn rewrite_subqueries(&mut self, mut f: F) -> Result> @@ -220,7 +218,7 @@ impl LogicalPlan { } } -/// writes each element in the iterator using `f` +/// writes each `&mut Expr` in the iterator using `f` fn rewrite_expr_iter_mut<'a, F>( i: impl IntoIterator, mut f: F, @@ -233,17 +231,18 @@ where } /// A temporary node that is left in place while rewriting the children of a -/// logical plan to ensure that the logical plan is always in a valid state +/// [`LogicalPlan`]. This is necessary to ensure that the `LogicalPlan` is +/// always in a valid state (from the Rust perspective) static PLACEHOLDER: OnceLock> = OnceLock::new(); -/// Applies f() to rewrite each child of the logical plan, replacing the existing -/// node with the result, trying to avoid clone'ing. +/// Applies `f` to rewrite the existing node, while avoiding `clone`'ing as much +/// as possiblw. /// -/// TODO we would remove the Arc nonsense entirely from LogicalPlan -/// and have it own its inputs however, for now do a horrible hack and swap out the value -/// of the Arc to avoid cloning the entire plan +/// TODO eventually remove `Arc` from `LogicalPlan` and have it own +/// its inputs, so this code would not be needed. However, for now we try and +/// unwrap the `Arc` which avoids `clone`ing in most cases. /// -/// On error, the node will be partially rewritten (left with a placeholder logical plan) +/// On error, node be left with a placeholder logical plan fn rewrite_arc(node: &mut Arc, mut f: F) -> Result> where F: FnMut(&mut LogicalPlan) -> Result>, @@ -263,16 +262,11 @@ where std::mem::swap(node, &mut new_node); // try to update existing node, if it isn't shared with others - let mut new_node = match Arc::try_unwrap(new_node) { - Ok(node) => { - //println!("Unwrapped arc yay"); - node - } - Err(node) => { - //println!("Failed to unwrap arc boo"); - node.as_ref().clone() - } - }; + let mut new_node = Arc::try_unwrap(new_node) + // if None is returned, there is another reference to this + // LogicalPlan, so we must clone instead + .unwrap_or_else(|node| node.as_ref().clone()); + // apply the actual transform let result = f(&mut new_node)?; @@ -283,24 +277,23 @@ where Ok(result) } -/// Rewrites a `Column` in place using the provided closure +/// Rewrites a [`Column`] in place using the provided closure fn rewrite_column(column: &mut Column, mut f: F) -> Result> where F: FnMut(&mut Expr) -> Result>, { - // Column's isn't an Expr to visit, but the closure is to rewrite Exprs. - // So we need to make a temporary Expr to rewrite and then put it bac, + // Since `Column`'s isn't an `Expr`, but the closure in terms of Exprs, + // we make a temporary Expr to rewrite and then put it back let mut swap_column = Column::new_unqualified("TEMP_unnest_column"); std::mem::swap(column, &mut swap_column); let mut expr = Expr::Column(swap_column); let result = f(&mut expr)?; - // put the column back + // Get the rewritten column let Expr::Column(mut swap_column) = expr else { return internal_err!( - "Rewrite of Unnest expr must return Column, returned {:?}", - expr + "Rewrite of Column Expr must return Column, returned {expr:?}" ); }; // put the rewritten column back @@ -310,6 +303,7 @@ where /// Rewrites all expressions for an Extension node "in place" /// (it currently has to copy values because there are no APIs for in place modification) +/// TODO file ticket for inplace modificiation of Extension nodes /// /// Should be removed when we have an API for in place modifications of the /// extension to avoid these copies