Skip to content

Commit

Permalink
extend Skip (Prune) functionality to Jump as it is defined in https:/…
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Feb 10, 2024
1 parent 9c0ec41 commit dcf0189
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 57 deletions.
73 changes: 33 additions & 40 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use crate::Result;

/// If the function returns [`TreeNodeRecursion::Continue`], the normal execution of the
/// function continues. If it returns [`TreeNodeRecursion::Skip`], the function returns
/// function continues. If it returns [`TreeNodeRecursion::Jump`], the function returns
/// with [`TreeNodeRecursion::Continue`] to jump next recursion step, bypassing further
/// exploration of the current step. In case of [`TreeNodeRecursion::Stop`], the function
/// return with [`TreeNodeRecursion::Stop`] and recursion halts.
Expand All @@ -34,7 +34,7 @@ macro_rules! handle_tree_recursion {
TreeNodeRecursion::Continue => {}
// If the recursion should skip, do not apply to its children, let
// the recursion continue:
TreeNodeRecursion::Skip => return Ok(TreeNodeRecursion::Continue),
TreeNodeRecursion::Jump => return Ok(TreeNodeRecursion::Continue),
// If the recursion should stop, do not apply to its children:
TreeNodeRecursion::Stop => return Ok(TreeNodeRecursion::Stop),
}
Expand Down Expand Up @@ -135,7 +135,7 @@ pub trait TreeNode: Sized {
FD: FnMut(Self) -> Result<Transformed<Self>>,
FU: FnMut(Self) -> Result<Transformed<Self>>,
{
f_down(self)?.and_then_transform_children(|n| {
f_down(self)?.and_then_transform_on_continue(|n| {
n.map_children(|c| c.transform(f_down, f_up))?
.and_then_transform(f_up)
})
Expand All @@ -148,7 +148,8 @@ pub trait TreeNode: Sized {
where
F: Fn(Self) -> Result<Transformed<Self>>,
{
f(self)?.and_then_transform_children(|n| n.map_children(|c| c.transform_down(f)))
f(self)?
.and_then_transform_on_continue(|n| n.map_children(|c| c.transform_down(f)))
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its
Expand All @@ -158,8 +159,9 @@ pub trait TreeNode: Sized {
where
F: FnMut(Self) -> Result<Transformed<Self>>,
{
f(self)?
.and_then_transform_children(|n| n.map_children(|c| c.transform_down_mut(f)))
f(self)?.and_then_transform_on_continue(|n| {
n.map_children(|c| c.transform_down_mut(f))
})
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
Expand Down Expand Up @@ -212,7 +214,7 @@ pub trait TreeNode: Sized {
self,
rewriter: &mut R,
) -> Result<Transformed<Self>> {
rewriter.f_down(self)?.and_then_transform_children(|n| {
rewriter.f_down(self)?.and_then_transform_on_continue(|n| {
n.map_children(|c| c.rewrite(rewriter))?
.and_then_transform(|n| rewriter.f_up(n))
})
Expand Down Expand Up @@ -252,7 +254,7 @@ pub trait TreeNode: Sized {
/// siblings of that tree node are visited, nor is post_visit
/// called on its parent tree node
///
/// If [`TreeNodeRecursion::Skip`] is returned on a call to pre_visit, no
/// If [`TreeNodeRecursion::Jump`] is returned on a call to pre_visit, no
/// children of that tree node are visited.
pub trait TreeNodeVisitor: Sized {
/// The node type which is visitable.
Expand Down Expand Up @@ -294,12 +296,12 @@ pub enum TreeNodeRecursion {
/// Continue recursion with the next node.
Continue,

/// Do not recurse into children.
/// Has effect only if returned from top-down transform closures or
/// [`TreeNodeVisitor::pre_visit`] or [`TreeNodeRewriter::f_down`].
/// If returned from bottom-up transform closures or [`TreeNodeVisitor::post_visit`] or
/// [`TreeNodeRewriter::f_up`] then works as [`TreeNodeRecursion::Continue`].
Skip,
/// In top-down traversals skip recursing into children but continue with the next
/// node, which actually means pruning of the subtree.
/// In bottom-up traversals bypass calling bottom-up closures till the next leaf node.
/// In combined traversals bypass calling bottom-up closures till the first top-down
/// closure.
Jump,

/// Stop recursion.
Stop,
Expand Down Expand Up @@ -358,14 +360,12 @@ impl<T> Transformed<T> {
fn and_then<F: FnOnce(T) -> Result<Transformed<T>>>(
self,
f: F,
children: bool,
return_continue_on_jump: bool,
) -> Result<Transformed<T>> {
match self.tnr {
TreeNodeRecursion::Continue => {}
TreeNodeRecursion::Skip => {
// If the next transformation would happen on children return immediately
// on `Skip`.
if children {
TreeNodeRecursion::Jump => {
if return_continue_on_jump {
return Ok(Transformed {
tnr: TreeNodeRecursion::Continue,
..self
Expand All @@ -388,7 +388,7 @@ impl<T> Transformed<T> {
self.and_then(f, false)
}

pub fn and_then_transform_children<F: FnOnce(T) -> Result<Transformed<T>>>(
pub fn and_then_transform_on_continue<F: FnOnce(T) -> Result<Transformed<T>>>(
self,
f: F,
) -> Result<Transformed<T>> {
Expand Down Expand Up @@ -418,26 +418,19 @@ impl<I: Iterator> TransformedIterator for I {
let mut new_transformed = false;
let new_data = self
.map(|i| {
if new_tnr == TreeNodeRecursion::Continue
|| new_tnr == TreeNodeRecursion::Skip
{
let Transformed {
data,
transformed,
tnr,
} = f(i)?;
new_tnr = if tnr == TreeNodeRecursion::Skip {
// Iterator always considers the elements as siblings so `Skip`
// can be safely converted to `Continue`.
TreeNodeRecursion::Continue
} else {
tnr
};
new_transformed |= transformed;
Ok(data)
} else {
Ok(i)
}
Ok(match new_tnr {
TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {
let Transformed {
data,
transformed,
tnr,
} = f(i)?;
new_tnr = tnr;
new_transformed |= transformed;
data
}
TreeNodeRecursion::Stop => i,
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Transformed {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(name);
if is_applicable {
Ok(TreeNodeRecursion::Skip)
Ok(TreeNodeRecursion::Jump)
} else {
Ok(TreeNodeRecursion::Stop)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {

if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
return Ok(Transformed::new(node, false, TreeNodeRecursion::Skip));
return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump));
}
} else if self.table_schema.index_of(column.name()).is_err() {
// If the column does not exist in the (un-projected) table schema then
// it must be a projected column.
self.projected_columns = true;
return Ok(Transformed::new(node, false, TreeNodeRecursion::Skip));
return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump));
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,12 +652,12 @@ impl LogicalPlan {
| Expr::ScalarSubquery(_)
| Expr::InSubquery(_) => {
// subqueries could contain aliases so we don't recurse into those
Ok(Transformed::new(expr, false, TreeNodeRecursion::Skip))
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
Expr::Alias(_) => Ok(Transformed::new(
expr.unalias(),
true,
TreeNodeRecursion::Skip,
TreeNodeRecursion::Jump,
)),
_ => Ok(Transformed::no(expr)),
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ where
exprs.push(expr.clone())
}
// stop recursing down this expr once we find a match
return Ok(TreeNodeRecursion::Skip);
return Ok(TreeNodeRecursion::Jump);
}

Ok(TreeNodeRecursion::Continue)
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
// related to https://github.com/apache/arrow-datafusion/issues/8814
// If the expr contain volatile expression or is a short-circuit expression, skip it.
if expr.short_circuits() || is_volatile_expression(expr)? {
return Ok(TreeNodeRecursion::Skip);
return Ok(TreeNodeRecursion::Jump);
}
self.visit_stack
.push(VisitRecord::EnterMark(self.node_count));
Expand Down Expand Up @@ -750,12 +750,12 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
// the `id_array`, which records the expr's identifier used to rewrite expr. So if we
// skip an expr in `ExprIdentifierVisitor`, we should skip it here, too.
if expr.short_circuits() || is_volatile_expression(&expr)? {
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Skip));
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}
if self.curr_index >= self.id_array.len()
|| self.max_series_number > self.id_array[self.curr_index].0
{
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Skip));
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}

let curr_id = &self.id_array[self.curr_index].1;
Expand All @@ -774,7 +774,7 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
return Ok(Transformed::new(
expr,
false,
TreeNodeRecursion::Skip,
TreeNodeRecursion::Jump,
));
}

Expand All @@ -791,7 +791,7 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
return Ok(Transformed::new(
expr,
false,
TreeNodeRecursion::Skip,
TreeNodeRecursion::Jump,
));
}

Expand All @@ -810,7 +810,7 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
Ok(Transformed::new(
col(id).alias(expr_name),
true,
TreeNodeRecursion::Skip,
TreeNodeRecursion::Jump,
))
} else {
self.curr_index += 1;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr {
if plan_hold_outer {
// the unsupported case
self.can_pull_up = false;
Ok(Transformed::new(plan, false, TreeNodeRecursion::Skip))
Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
} else {
Ok(Transformed::no(plan))
}
Expand All @@ -77,15 +77,15 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr {
(false, true) => {
// the unsupported case
self.can_pull_up = false;
Ok(Transformed::new(plan, false, TreeNodeRecursion::Skip))
Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
}
_ => Ok(Transformed::no(plan)),
}
}
_ if plan.expressions().iter().any(|expr| expr.contains_outer()) => {
// the unsupported cases, the plan expressions contain out reference columns(like window expressions)
self.can_pull_up = false;
Ok(Transformed::new(plan, false, TreeNodeRecursion::Skip))
Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
}
_ => Ok(Transformed::no(plan)),
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
Expr::Column(_)
| Expr::Literal(_)
| Expr::Placeholder(_)
| Expr::ScalarVariable(_, _) => Ok(TreeNodeRecursion::Skip),
| Expr::ScalarVariable(_, _) => Ok(TreeNodeRecursion::Jump),
Expr::Exists { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl TreeNodeRewriter for ExtractScalarSubQuery {
subqry_alias,
)?),
true,
TreeNodeRecursion::Skip,
TreeNodeRecursion::Jump,
))
}
_ => Ok(Transformed::no(expr)),
Expand Down

0 comments on commit dcf0189

Please sign in to comment.