Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a common trait TreeNode for ExecutionPlan, PhysicalExpr, LogicalExpr, LogicalPlan #5630

Merged
merged 19 commits into from
Mar 27, 2023

Conversation

yahoNanJing
Copy link
Contributor

@yahoNanJing yahoNanJing commented Mar 17, 2023

Which issue does this PR close?

Closes #5609.

Rationale for this change

Currently, there are many duplicated code related to the visitor trait and rewrite trait. And the trait methods are sometimes misused.

What changes are included in this PR?

Combine the visitor and rewrite trait to be one trait TreeNode and provide common behavior for ExecutionPlan, PhysicalExpr, LogicalExpr, LogicalPlan. And this trait contains the following methods:

  • fn get_children(&self) -> Vec<Self>; for getting its children
  • fn collect<F>(&self, op: &mut F) -> Result<()> where F: FnMut(&Self) -> Result<Recursion>, for collecting info from the tree node by a closure
  • fn visit<V: TreeNodeVisitor<N = Self>>(&self, visitor: &mut V) -> Result<Recursion> for collecting info from the tree node by a visitor. This is used when need to collect info from a specific post_visit. If only pre_visit is needed, then the collect method is preferred.
  • fn transform_down<F>(self, op: &F) -> Result<Self> where F: Fn(Self) -> Result<Option<Self>>, for rewriting the node from top to down
  • fn transform_up<F>(self, op: &F) -> Result<Self> where F: Fn(Self) -> Result<Option<Self>>, for rewriting the node from bottom to up
  • fn rewrite<R: TreeNodeRewriter<N = Self>>(self, rewriter: &mut R) -> Result<Self> for rewriting the node from top to down. This is used when need to invoke a specific pre_visit. Otherwise, either the transform_down or transform_up is preferred.
  • fn map_children<F>(self, transform: F) -> Result<Self> where F: FnMut(Self) -> Result<Self> is for replacing all of its children with the rewritted ones

Are these changes tested?

Are there any user-facing changes?

@yahoNanJing yahoNanJing marked this pull request as draft March 17, 2023 11:57
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions labels Mar 17, 2023
@yahoNanJing yahoNanJing changed the title Issue 5609 Introduce a common trait TreeNode for ExecutionPlan, PhysicalExpr, LogicalExpr, LogicalPlan Mar 17, 2023
@yahoNanJing
Copy link
Contributor Author

Due to commit 258af4b, I may need to rebase to latest code and do a few refinements.

@yahoNanJing yahoNanJing requested a review from alamb March 17, 2023 11:59
@mingmwang
Copy link
Contributor

I will take a closer look tomorrow.

@alamb
Copy link
Contributor

alamb commented Mar 18, 2023

Combine the visitor and rewrite trait to be one trait TreeNode and provide common behavior for ExecutionPlan, PhysicalExpr, LogicalExpr, LogicalPlan. And this trait contains the following methods:

I believe Expr (logical exprs) are also included

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for cleaning this up @yahoNanJing -- I really like how the same trait is used for visiting all the various trees that are in DataFusion.

It would be awesome to figure out how to avoid the duplication in the physical exprs and physical plans

This would be a fairly non trivial API change (I think for the better) for all users of DataFusion that have rewriters; This is probably fine but I think it would be good to communicate this more broadly with people who have systems built on DataFusion for them to weigh in as well.

When it is ready for review, we can ping them and do some more communocation.

/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.collect(&mut |expr| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// [`TreeNodeVisitor`] allows keeping the algorithms
/// separate from the code to traverse the structure of the `TreeNode`
/// tree and makes it easier to add new types of tree node and
/// algorithms by.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// algorithms by.
/// algorithms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API looks very nice 👌

impl TreeNodeRewritable for PlanWithKeyRequirements {
impl TreeNode for PlanWithKeyRequirements {
fn get_children(&self) -> Vec<Self> {
unimplemented!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be implemented?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have the same question.

//! This module provides common traits for visiting or rewriting tree nodes easily.
//!
//! It's a duplication of the one in the crate `datafusion-common`.
//! In the future, if the Orphan rule is relaxed for Arc<dyn T>, these duplicated codes can be removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could figure out how to avoid this duplication and the one in physical-expr?

Maybe we could create a blanket impl in datafusion-common like

impl<T: TreeNode> TreeNode for Arc<T> {
...
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb. However, I'm still not sure how to apply to the trait ExecutionPlan and PhysicalExpr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this and I came up with a solution -- here is a proposed modification to this PR: yahoNanJing#70

where
F: Fn(Self) -> Result<Option<Self>>,
{
let node_cloned = self.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove cloning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments. I'm afraid it's not feasible because of the constraint of F: Fn(Self) -> Result<Option<Self>>,

Copy link
Contributor

@mingmwang mingmwang Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F will consume the Self.

Copy link
Contributor

@alamb alamb Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this clone could be quite bad for performance. For Arcd PhysicalExprs and ExecutionPlans, it is very cheap, but for Expr it means any code that uses this will effectively deep copy (recursively) the entire tree at each step (N^2 performance).

I think the root cause is the signature

F: Fn(Self) -> Result<Option<Self>>,

Which I think is being used to signal if the node was changed or not

In order to avoid the clone, I think we need to use something other than Option so the original can be returned when the change is not made.

How about something like

enum Transformed<T> {
  /// The item was transformed / rewritten somehow
  Yes(T),
  /// The item was not transformed
  No(T)
}

impl<T> From<Transformed<T>> for <T> {
  fn from(transformed: Transformed<T>) -> T {
    match transformed {
      Yes(t) => t,
      No(t) => t,
    }
}
  

And then this function would look something like

    fn transform_down<F>(self, op: &F) -> Result<Self>
    where
        F: Fn(Self) -> Result<Transformed<Self>>,
    {
        // this could be cleaner using the `From` impl
        let after_op = match op(node_cloned)? {
            Yes(value) => value,
            No(value) => value,
        };
        after_op.map_children(|node| node.transform_down(op))
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @alamb that this is due to the chosen signature, not an unavoidable clone -- and we should fix it. Same problem in exists in transform_up function too. When no change is made, the cloning is done wastefully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am particular concerned that this trait will be applied to Exprs (where as LogicalPlans and PhysicalPlans already have Arc's in them so the cost is not that high)

I would be willing to try and change the existing LogicalPlan optimizer (which I think might be causing this problems)

@jackwener and I were just discussing a similar issue with #5705 (comment)

Comment on lines 27 to 30
/// Use preorder to iterate the node on the tree so that we can stop fast for some cases.
///
/// `op` can be used to collect some info from the tree node.
fn collect<F>(&self, op: &mut F) -> Result<()>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect usually refers to apply some func to each element and the returned result should be a container type.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. Maybe it's better to change the name to for_each.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be try_for_each ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer to use apply. And later I'll remove the get_children and add apply_children.

Comment on lines 85 to 87
return Err(DataFusionError::Execution(format!(
"Recursion {r:?} is not supported for collect_using"
)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the error msg:

Recursion {r:?} is not supported for visit()

}
};

for child in self.get_children() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use lambda instead of the for loop ?

Comment on lines 34 to 43
match op(self)? {
Recursion::Continue => {}
// If the recursion should stop, do not visit children
Recursion::Stop => return Ok(()),
r => {
return Err(DataFusionError::Execution(format!(
"Recursion {r:?} is not supported for collect"
)))
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can consider using the ControlFlow from rust std ?

pub enum ControlFlow<B, C = ()> {
    /// Move on to the next phase of the operation as normal.
    #[stable(feature = "control_flow_enum_type", since = "1.55.0")]
    #[lang = "Continue"]
    Continue(C),
    /// Exit the operation without running subsequent phases.
    #[stable(feature = "control_flow_enum_type", since = "1.55.0")]
    #[lang = "Break"]
    Break(B),
    // Yes, the order of the variants doesn't match the type parameters.
    // They're in this order so that `ControlFlow<A, B>` <-> `Result<B, A>`
    // is a no-op conversion in the `Try` implementation.
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I'll refine for it.

}

/// Trait for potentially recursively transform an [`TreeNode`] node
/// tree. When passed to `TreeNode::transform_using`, `TreeNodeRewriter::mutate` is
/// invoked recursively on all nodes of a tree.
Copy link
Contributor

@mingmwang mingmwang Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no transform_using method any more. Please fix the comment.

Comment on lines 82 to 84
fn get_children(&self) -> Vec<Self> {
unimplemented!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is some kind of anti pattern.

use crate::Expr;
use datafusion_common::{tree_node::TreeNode, Result};

impl TreeNode for Expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some kind of duplication logic, I think maybe you can provide a default implementation for map_children in the TreeNode trait.


/// &mut transform a `Vec` of `Expr`s
fn transform_vec<F>(v: Vec<Expr>, transform: &mut F) -> Result<Vec<Expr>>
where
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use slice or IntoIterator.

@yahoNanJing
Copy link
Contributor Author

Hi @alamb, this PR is ready for review now. Could you help ping related guys for helping reviewing this PR?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @yahoNanJing for pushing this forward. I really like this PR except for:

  1. The cloning by default in transform_down (I have left some comments on how maybe to fix that)
  2. The replication of the trait -- I have a proposal of how to avoid the duplication in Remove duplicated TreeNode traits in physical-expr and physical plan yahoNanJing/arrow-datafusion#70

where
F: Fn(Self) -> Result<Option<Self>>,
{
let node_cloned = self.clone();
Copy link
Contributor

@alamb alamb Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this clone could be quite bad for performance. For Arcd PhysicalExprs and ExecutionPlans, it is very cheap, but for Expr it means any code that uses this will effectively deep copy (recursively) the entire tree at each step (N^2 performance).

I think the root cause is the signature

F: Fn(Self) -> Result<Option<Self>>,

Which I think is being used to signal if the node was changed or not

In order to avoid the clone, I think we need to use something other than Option so the original can be returned when the change is not made.

How about something like

enum Transformed<T> {
  /// The item was transformed / rewritten somehow
  Yes(T),
  /// The item was not transformed
  No(T)
}

impl<T> From<Transformed<T>> for <T> {
  fn from(transformed: Transformed<T>) -> T {
    match transformed {
      Yes(t) => t,
      No(t) => t,
    }
}
  

And then this function would look something like

    fn transform_down<F>(self, op: &F) -> Result<Self>
    where
        F: Fn(Self) -> Result<Transformed<Self>>,
    {
        // this could be cleaner using the `From` impl
        let after_op = match op(node_cloned)? {
            Yes(value) => value,
            No(value) => value,
        };
        after_op.map_children(|node| node.transform_down(op))
    }

//! This module provides common traits for visiting or rewriting tree nodes easily.
//!
//! It's a duplication of the one in the crate `datafusion-common`.
//! In the future, if the Orphan rule is relaxed for Arc<dyn T>, these duplicated codes can be removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this and I came up with a solution -- here is a proposed modification to this PR: yahoNanJing#70

@jackwener jackwener self-requested a review March 24, 2023 02:52
yahoNanJing and others added 2 commits March 24, 2023 13:18
@jackwener jackwener removed their request for review March 24, 2023 11:31
@github-actions github-actions bot added the sql SQL Planner label Mar 24, 2023
@yahoNanJing
Copy link
Contributor Author

Thanks @alamb. According to your advice, the clone() and duplicated [TreeNode] definition have been removed. If there's no big issues, I suggest to merge this PR as soon as possible so that others can leverage the common trait as soon as possible. And another benefit would be less effort to deal with the conflicts, since there're so many files involved in this PR.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great @yahoNanJing -- thank you so much.

I recommend we merge this PR as soon as we are good with the 21.0.0 release. There is a minor issue reported on the mailing list here: https://lists.apache.org/thread/fnnmrn83spnb2y2l2vdw2v1hd54pfjl7

@alamb alamb added the api change Changes the API exposed to users of the crate label Mar 25, 2023
@alamb
Copy link
Contributor

alamb commented Mar 25, 2023

Unless anyone else has thoughts on this PR I plan to merge it monday

@alamb alamb merged commit 8df18ab into apache:main Mar 27, 2023
}
}

/// Helper trait for implementing [`TreeNode`] that have children stored as Arc's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very clever design. I tried this refactor before and somewhat gave up because of Rust's orphan rule (i.e. you cannot impl Trait for Arc<T> if trait is from a different crate).

@mingmwang
Copy link
Contributor

Looks really nice !

@mingmwang
Copy link
Contributor

@yahoNanJing
There are some other improvement we can do:

In the map_children() method of LogicalPlan, we can avoid construct the plan again and again if the new children has no changes compared to the old inputs.

    fn map_children<F>(self, transform: F) -> Result<Self>
    where
        F: FnMut(Self) -> Result<Self>,
    {
        let children = self.inputs().into_iter().cloned().collect::<Vec<_>>();
        if !children.is_empty() {
            let new_children: Result<Vec<_>> =
                children.into_iter().map(transform).collect();
            self.with_new_inputs(new_children?.as_slice())
        } else {
            Ok(self)
        }
    }

We have the with_new_children_if_necessary utils for Arc<dyn ExecutionPlan> and Arc<dyn PhysicalExpr>

@mingmwang
Copy link
Contributor

I think this method can be removed.

pub fn optimize_children(
    optimizer: &impl PhysicalOptimizerRule,
    plan: Arc<dyn ExecutionPlan>,
    config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
    let children = plan
        .children()
        .iter()
        .map(|child| optimizer.optimize(Arc::clone(child), config))
        .collect::<Result<Vec<_>>>()?;

    if children.is_empty() {
        Ok(Arc::clone(&plan))
    } else {
        with_new_children_if_necessary(plan, children).map(Transformed::into)
    }
}

@alamb
Copy link
Contributor

alamb commented Mar 28, 2023

In the map_children() method of LogicalPlan, we can avoid construct the plan again and again if the new children has no changes compared to the old inputs.

in #5761

@alamb
Copy link
Contributor

alamb commented Mar 28, 2023

I think this method can be removed.

Filed #5762 to track

Thanks for the comments @mingmwang

@alamb
Copy link
Contributor

alamb commented Mar 28, 2023

FWIW I upgraded IOx (which uses the rewriters heavily) to this rewriter code in https://github.com/influxdata/influxdb_iox/pull/7353 -- it was some non trivial API churn but I think resulting code is much nicer 👌

Thanks again @yahoNanJing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce a common trait TreeNode for ExecutionPlan, PhysicalExpr, LogicalExpr, LogicalPlan
7 participants