Skip to content

Commit

Permalink
add map_until_stop_and_collect macro
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Mar 30, 2024
1 parent d896000 commit cf55bc6
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 135 deletions.
49 changes: 37 additions & 12 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl<T> Transformed<T> {
}
}

/// Transformation helper to process tree nodes that are siblings.
/// Transformation helper to process sequence of iterable tree nodes that are siblings.
pub trait TransformedIterator: Iterator {
fn map_until_stop_and_collect<
F: FnMut(Self::Item) -> Result<Transformed<Self::Item>>,
Expand All @@ -551,20 +551,45 @@ impl<I: Iterator> TransformedIterator for I {
) -> Result<Transformed<Vec<Self::Item>>> {
let mut tnr = TreeNodeRecursion::Continue;
let mut transformed = false;
let data = self
.map(|item| match tnr {
TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {
f(item).map(|result| {
self.map(|item| match tnr {
TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {
f(item).map(|result| {
tnr = result.tnr;
transformed |= result.transformed;
result.data
})
}
TreeNodeRecursion::Stop => Ok(item),
})
.collect::<Result<Vec<_>>>()
.map(|data| Transformed::new(data, transformed, tnr))
}
}

/// Transformation helper to process sequence of tree node containing expressions.
/// This macro is very similar to [TransformedIterator::map_until_stop_and_collect] to
/// process nodes that are siblings, but it accepts a sequence of pairs of an expression
/// and a transformation.
#[macro_export]
macro_rules! map_until_stop_and_collect {
($($EXPR:expr, $F:expr),*) => {{
let mut tnr = TreeNodeRecursion::Continue;
let mut transformed = false;
let data = (
$(
if tnr == TreeNodeRecursion::Continue || tnr == TreeNodeRecursion::Jump {
$F($EXPR).map(|result| {
tnr = result.tnr;
transformed |= result.transformed;
result.data
})
}
TreeNodeRecursion::Stop => Ok(item),
})
.collect::<Result<Vec<_>>>()?;
Ok(Transformed::new(data, transformed, tnr))
}
})?
} else {
$EXPR
},
)*
);
Transformed::new(data, transformed, tnr)
}}
}

/// Transformation helper to access [`Transformed`] fields in a [`Result`] easily.
Expand Down
236 changes: 113 additions & 123 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use crate::{Expr, GetFieldAccess};
use datafusion_common::tree_node::{
Transformed, TransformedIterator, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{handle_visit_recursion, internal_err, Result};
use datafusion_common::{
handle_visit_recursion, internal_err, map_until_stop_and_collect, DataFusionError,
Result,
};

impl TreeNode for Expr {
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
Expand Down Expand Up @@ -167,58 +170,58 @@ impl TreeNode for Expr {
Expr::InSubquery(InSubquery::new(be, subquery, negated))
}),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
transform_box(left, &mut f)?
.update_data(|new_left| (new_left, right))
.try_transform_node(|(new_left, right)| {
Ok(transform_box(right, &mut f)?
.update_data(|new_right| (new_left, new_right)))
})?
.update_data(|(new_left, new_right)| {
Expr::BinaryExpr(BinaryExpr::new(new_left, op, new_right))
})
map_until_stop_and_collect!(
left,
|left| transform_box(left, &mut f),
right,
|right| transform_box(right, &mut f)
)
.update_data(|(new_left, new_right)| {
Expr::BinaryExpr(BinaryExpr::new(new_left, op, new_right))
})
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => transform_box(expr, &mut f)?
.update_data(|new_expr| (new_expr, pattern))
.try_transform_node(|(new_expr, pattern)| {
Ok(transform_box(pattern, &mut f)?
.update_data(|new_pattern| (new_expr, new_pattern)))
})?
.update_data(|(new_expr, new_pattern)| {
Expr::Like(Like::new(
negated,
new_expr,
new_pattern,
escape_char,
case_insensitive,
))
}),
}) => map_until_stop_and_collect!(
expr,
|expr| transform_box(expr, &mut f),
pattern,
|pattern| transform_box(pattern, &mut f)
)
.update_data(|(new_expr, new_pattern)| {
Expr::Like(Like::new(
negated,
new_expr,
new_pattern,
escape_char,
case_insensitive,
))
}),
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => transform_box(expr, &mut f)?
.update_data(|new_expr| (new_expr, pattern))
.try_transform_node(|(new_expr, pattern)| {
Ok(transform_box(pattern, &mut f)?
.update_data(|new_pattern| (new_expr, new_pattern)))
})?
.update_data(|(new_expr, new_pattern)| {
Expr::SimilarTo(Like::new(
negated,
new_expr,
new_pattern,
escape_char,
case_insensitive,
))
}),
}) => map_until_stop_and_collect!(
expr,
|expr| transform_box(expr, &mut f),
pattern,
|pattern| transform_box(pattern, &mut f)
)
.update_data(|(new_expr, new_pattern)| {
Expr::SimilarTo(Like::new(
negated,
new_expr,
new_pattern,
escape_char,
case_insensitive,
))
}),
Expr::Not(expr) => transform_box(expr, &mut f)?.update_data(Expr::Not),
Expr::IsNotNull(expr) => {
transform_box(expr, &mut f)?.update_data(Expr::IsNotNull)
Expand Down Expand Up @@ -248,48 +251,41 @@ impl TreeNode for Expr {
negated,
low,
high,
}) => transform_box(expr, &mut f)?
.update_data(|new_expr| (new_expr, low, high))
.try_transform_node(|(new_expr, low, high)| {
Ok(transform_box(low, &mut f)?
.update_data(|new_low| (new_expr, new_low, high)))
})?
.try_transform_node(|(new_expr, new_low, high)| {
Ok(transform_box(high, &mut f)?
.update_data(|new_high| (new_expr, new_low, new_high)))
})?
.update_data(|(new_expr, new_low, new_high)| {
Expr::Between(Between::new(new_expr, negated, new_low, new_high))
}),
}) => map_until_stop_and_collect!(
expr,
|expr| transform_box(expr, &mut f),
low,
|low| transform_box(low, &mut f),
high,
|high| transform_box(high, &mut f)
)
.update_data(|(new_expr, new_low, new_high)| {
Expr::Between(Between::new(new_expr, negated, new_low, new_high))
}),
Expr::Case(Case {
expr,
when_then_expr,
else_expr,
}) => transform_option_box(expr, &mut f)?
.update_data(|new_expr| (new_expr, when_then_expr, else_expr))
.try_transform_node(|(new_expr, when_then_expr, else_expr)| {
Ok(when_then_expr
.into_iter()
.map_until_stop_and_collect(|(when, then)| {
transform_box(when, &mut f)?
.update_data(|new_when| (new_when, then))
.try_transform_node(|(new_when, then)| {
Ok(transform_box(then, &mut f)?
.update_data(|new_then| (new_when, new_then)))
})
})?
.update_data(|new_when_then_expr| {
(new_expr, new_when_then_expr, else_expr)
}))
})?
.try_transform_node(|(new_expr, new_when_then_expr, else_expr)| {
Ok(transform_option_box(else_expr, &mut f)?.update_data(
|new_else_expr| (new_expr, new_when_then_expr, new_else_expr),
))
})?
.update_data(|(new_expr, new_when_then_expr, new_else_expr)| {
Expr::Case(Case::new(new_expr, new_when_then_expr, new_else_expr))
}),
}) => map_until_stop_and_collect!(
expr,
|expr| transform_option_box(expr, &mut f),
when_then_expr,
|when_then_expr: Vec<(Box<Expr>, Box<Expr>)>| when_then_expr
.into_iter()
.map_until_stop_and_collect(|(when, then)| {
Ok(map_until_stop_and_collect!(
when,
|when| transform_box(when, &mut f),
then,
|then| transform_box(then, &mut f)
))
}),
else_expr,
|else_expr| transform_option_box(else_expr, &mut f)
)
.update_data(|(new_expr, new_when_then_expr, new_else_expr)| {
Expr::Case(Case::new(new_expr, new_when_then_expr, new_else_expr))
}),
Expr::Cast(Cast { expr, data_type }) => transform_box(expr, &mut f)?
.update_data(|be| Expr::Cast(Cast::new(be, data_type))),
Expr::TryCast(TryCast { expr, data_type }) => transform_box(expr, &mut f)?
Expand Down Expand Up @@ -320,48 +316,41 @@ impl TreeNode for Expr {
order_by,
window_frame,
null_treatment,
}) => transform_vec(args, &mut f)?
.update_data(|new_args| (new_args, partition_by, order_by))
.try_transform_node(|(new_args, partition_by, order_by)| {
Ok(transform_vec(partition_by, &mut f)?.update_data(
|new_partition_by| (new_args, new_partition_by, order_by),
))
})?
.try_transform_node(|(new_args, new_partition_by, order_by)| {
Ok(
transform_vec(order_by, &mut f)?.update_data(|new_order_by| {
(new_args, new_partition_by, new_order_by)
}),
)
})?
.update_data(|(new_args, new_partition_by, new_order_by)| {
Expr::WindowFunction(WindowFunction::new(
fun,
new_args,
new_partition_by,
new_order_by,
window_frame,
null_treatment,
))
}),
}) => map_until_stop_and_collect!(
args,
|args| transform_vec(args, &mut f),
partition_by,
|partition_by| transform_vec(partition_by, &mut f),
order_by,
|order_by| transform_vec(order_by, &mut f)
)
.update_data(|(new_args, new_partition_by, new_order_by)| {
Expr::WindowFunction(WindowFunction::new(
fun,
new_args,
new_partition_by,
new_order_by,
window_frame,
null_treatment,
))
}),
Expr::AggregateFunction(AggregateFunction {
args,
func_def,
distinct,
filter,
order_by,
null_treatment,
}) => transform_vec(args, &mut f)?
.update_data(|new_args| (new_args, filter, order_by))
.try_transform_node(|(new_args, filter, order_by)| {
Ok(transform_option_box(filter, &mut f)?
.update_data(|new_filter| (new_args, new_filter, order_by)))
})?
.try_transform_node(|(new_args, new_filter, order_by)| {
Ok(transform_option_vec(order_by, &mut f)?
.update_data(|new_order_by| (new_args, new_filter, new_order_by)))
})?
.map_data(|(new_args, new_filter, new_order_by)| match func_def {
}) => map_until_stop_and_collect!(
args,
|args| transform_vec(args, &mut f),
filter,
|filter| transform_option_box(filter, &mut f),
order_by,
|order_by| transform_option_vec(order_by, &mut f)
)
.map_data(
|(new_args, new_filter, new_order_by)| match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun,
Expand All @@ -384,7 +373,8 @@ impl TreeNode for Expr {
AggregateFunctionDefinition::Name(_) => {
internal_err!("Function `Expr` with name should be resolved.")
}
})?,
},
)?,
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => transform_vec(exprs, &mut f)?
.update_data(|ve| Expr::GroupingSet(GroupingSet::Rollup(ve))),
Expand All @@ -401,15 +391,15 @@ impl TreeNode for Expr {
expr,
list,
negated,
}) => transform_box(expr, &mut f)?
.update_data(|new_expr| (new_expr, list))
.try_transform_node(|(new_expr, list)| {
Ok(transform_vec(list, &mut f)?
.update_data(|new_list| (new_expr, new_list)))
})?
.update_data(|(new_expr, new_list)| {
Expr::InList(InList::new(new_expr, new_list, negated))
}),
}) => map_until_stop_and_collect!(
expr,
|expr| transform_box(expr, &mut f),
list,
|list| transform_vec(list, &mut f)
)
.update_data(|(new_expr, new_list)| {
Expr::InList(InList::new(new_expr, new_list, negated))
}),
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
transform_box(expr, &mut f)?.update_data(|be| {
Expr::GetIndexedField(GetIndexedField::new(be, field))
Expand Down

0 comments on commit cf55bc6

Please sign in to comment.