Skip to content

Commit

Permalink
don't trust t.transformed coming from transformation closures, keep…
Browse files Browse the repository at this point in the history
… the old way of detecting if changes were made
  • Loading branch information
peter-toth committed Feb 1, 2024
1 parent 84d91c6 commit 3653aa6
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 25 deletions.
22 changes: 14 additions & 8 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,13 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
let children = self.arc_children();
if !children.is_empty() {
let t = children.into_iter().map_till_continue_and_collect(f)?;
// TODO: once we trust `t.transformed` don't create new node if not necessary
// TODO: Currently `t.transformed` quality comes from if the transformation
// closures fill the field correctly. Once we trust `t.transformed` we can
// remove the additional `t2` check.
// Please note that we need to propagate up `t.tnr` though.
let arc_self = Arc::clone(&self);
self.with_new_arc_children(arc_self, t.data)
let t2 = self.with_new_arc_children(arc_self, t.data)?;
Ok(Transformed::new(t2.data, t2.transformed, t.tnr))
} else {
Ok(Transformed::no(self))
}
Expand All @@ -499,7 +503,7 @@ pub trait ConcreteTreeNode: Sized {
fn take_children(self) -> (Self, Vec<Self>);

/// Reattaches updated child nodes to the node, returning the updated node.
fn with_new_children(self, children: Vec<Self>) -> Result<Self>;
fn with_new_children(self, children: Vec<Self>) -> Result<Transformed<Self>>;
}

impl<T: ConcreteTreeNode> TreeNode for T {
Expand All @@ -520,11 +524,13 @@ impl<T: ConcreteTreeNode> TreeNode for T {
{
let (new_self, children) = self.take_children();
if !children.is_empty() {
children
.into_iter()
.map_till_continue_and_collect(f)?
// TODO: once we trust `transformed` don't create new node if not necessary
.flat_map_data(|new_children| new_self.with_new_children(new_children))
let t = children.into_iter().map_till_continue_and_collect(f)?;
// TODO: Currently `t.transformed` quality comes from if the transformation
// closures fill the field correctly. Once we trust `t.transformed` we can
// remove the additional `t2` check.
// Please note that we need to propagate up `t.tnr` though.
let t2 = new_self.with_new_children(t.data)?;
Ok(Transformed::new(t2.data, t2.transformed, t.tnr))
} else {
Ok(Transformed::no(new_self))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ fn replace_order_preserving_variants(
}
}

context.update_plan_from_children()
context.update_plan_from_children().map(|t| t.data)
}

/// This utility function adds a [`SortExec`] above an operator according to the
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn update_sort_ctx_children(
}

node.data = data;
node.update_plan_from_children()
node.update_plan_from_children().map(|t| t.data)
}

/// This object is used within the [`EnforceSorting`] rule to track the closest
Expand Down Expand Up @@ -477,7 +477,7 @@ fn remove_corresponding_coalesce_in_sub_plan(
.collect::<Result<_>>()?;
}

requirements.update_plan_from_children()
requirements.update_plan_from_children().map(|t| t.data)
}

/// Updates child to remove the unnecessary sort below it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn plan_with_order_preserving_variants(
}
}

sort_input.update_plan_from_children()
sort_input.update_plan_from_children().map(|t| t.data)
}

/// Calculates the updated plan by replacing operators that preserve ordering
Expand Down Expand Up @@ -184,7 +184,7 @@ fn plan_with_order_breaking_variants(
let coalesce = CoalescePartitionsExec::new(child);
sort_input.plan = Arc::new(coalesce) as _;
} else {
return sort_input.update_plan_from_children();
return sort_input.update_plan_from_children().map(|t| t.data);
}

sort_input.children[0].data = false;
Expand Down
11 changes: 7 additions & 4 deletions datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ impl TreeNode for LogicalPlan {
.iter()
.map(|c| (*c).clone())
.map_till_continue_and_collect(f)?;
// TODO: once we trust `t.transformed` remove additional check
if old_children
// TODO: Currently `t.transformed` quality comes from if the transformation
// closures fill the field correctly. Once we trust `t.transformed` we can remove
// the additional `t2` check.
// Please note that we need to propagate up `t.tnr` though.
let t2 = old_children
.into_iter()
.zip(t.data.iter())
.any(|(c1, c2)| c1 != c2)
{
.any(|(c1, c2)| c1 != c2);
if t2 {
Ok(Transformed::new(
self.with_new_exprs(self.expressions(), t.data)?,
true,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/physical-expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ impl<T> ExprContext<T> {
}
}

pub fn update_expr_from_children(mut self) -> Result<Self> {
pub fn update_expr_from_children(mut self) -> Result<Transformed<Self>> {
let children_expr = self.children.iter().map(|c| c.expr.clone()).collect();
self.expr = with_new_children_if_necessary(self.expr, children_expr)?.data;
Ok(self)
let t = with_new_children_if_necessary(self.expr, children_expr)?;
self.expr = t.data;
Ok(Transformed::new(self, t.transformed, t.tnr))
}
}

Expand Down Expand Up @@ -93,7 +94,7 @@ impl<T> ConcreteTreeNode for ExprContext<T> {
(self, children)
}

fn with_new_children(mut self, children: Vec<Self>) -> Result<Self> {
fn with_new_children(mut self, children: Vec<Self>) -> Result<Transformed<Self>> {
self.children = children;
self.update_expr_from_children()
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/physical-plan/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ impl<T> PlanContext<T> {
}
}

pub fn update_plan_from_children(mut self) -> Result<Self> {
pub fn update_plan_from_children(mut self) -> Result<Transformed<Self>> {
let children_plans = self.children.iter().map(|c| c.plan.clone()).collect();
self.plan = with_new_children_if_necessary(self.plan, children_plans)?.data;
Ok(self)
let t = with_new_children_if_necessary(self.plan, children_plans)?;
self.plan = t.data;
Ok(Transformed::new(self, t.transformed, t.tnr))
}
}

Expand Down Expand Up @@ -94,7 +95,7 @@ impl<T> ConcreteTreeNode for PlanContext<T> {
(self, children)
}

fn with_new_children(mut self, children: Vec<Self>) -> Result<Self> {
fn with_new_children(mut self, children: Vec<Self>) -> Result<Transformed<Self>> {
self.children = children;
self.update_plan_from_children()
}
Expand Down

0 comments on commit 3653aa6

Please sign in to comment.