Skip to content

Commit

Permalink
Remove Expr clones from SortExprs
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Nov 5, 2024
1 parent 2e52580 commit 47df15a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 32 deletions.
3 changes: 1 addition & 2 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ use datafusion::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::tree_node::replace_sort_expression;
use datafusion_expr::{FetchType, Projection, SortExpr};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;
Expand Down Expand Up @@ -440,7 +439,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
Ok(Self {
k: self.k,
input: inputs.swap_remove(0),
expr: replace_sort_expression(self.expr.clone(), exprs.swap_remove(0)),
expr: self.expr.with_expr(exprs.swap_remove(0)),
})
}

Expand Down
8 changes: 8 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,14 @@ impl Sort {
nulls_first: !self.nulls_first,
}
}

pub fn with_expr(&self, expr: Expr) -> Self {
Self {
expr,
asc: self.asc,
nulls_first: self.nulls_first,
}
}
}

impl Display for Sort {
Expand Down
7 changes: 5 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use indexmap::IndexSet;

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::tree_node::replace_sort_expressions;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -866,7 +865,11 @@ impl LogicalPlan {
}) => {
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Sort(Sort {
expr: replace_sort_expressions(sort_expr.clone(), expr),
expr: expr
.into_iter()
.zip(sort_expr.iter())
.map(|(expr, sort)| sort.with_expr(expr))
.collect(),
input: Arc::new(input),
fetch: *fetch,
}))
Expand Down
28 changes: 4 additions & 24 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,29 +408,9 @@ pub fn transform_sort_option_vec<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
/// Transforms an vector of sort expressions by applying the provided closure `f`.
pub fn transform_sort_vec<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
sorts: Vec<Sort>,
mut f: &mut F,
f: &mut F,
) -> Result<Transformed<Vec<Sort>>> {
Ok(sorts
.iter()
.map(|sort| sort.expr.clone())
.map_until_stop_and_collect(&mut f)?
.update_data(|transformed_exprs| {
replace_sort_expressions(sorts, transformed_exprs)
}))
}

pub fn replace_sort_expressions(sorts: Vec<Sort>, new_expr: Vec<Expr>) -> Vec<Sort> {
assert_eq!(sorts.len(), new_expr.len());
sorts
.into_iter()
.zip(new_expr)
.map(|(sort, expr)| replace_sort_expression(sort, expr))
.collect()
}

pub fn replace_sort_expression(sort: Sort, new_expr: Expr) -> Sort {
Sort {
expr: new_expr,
..sort
}
sorts.into_iter().map_until_stop_and_collect(|s| {
Ok(f(s.expr)?.update_data(|e| Sort { expr: e, ..s }))
})
}
19 changes: 15 additions & 4 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::logical_plan::{
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
};
use datafusion_expr::tree_node::replace_sort_expressions;
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator};
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator, SortExpr};

const CSE_PREFIX: &str = "__common_expr";

Expand Down Expand Up @@ -91,19 +90,31 @@ impl CommonSubexprEliminate {
.map(LogicalPlan::Projection)
})
}

fn try_optimize_sort(
&self,
sort: Sort,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let Sort { expr, input, fetch } = sort;
let input = Arc::unwrap_or_clone(input);
let sort_expressions = expr.iter().map(|sort| sort.expr.clone()).collect();
let (sort_expressions, sort_params): (Vec<_>, Vec<(_, _)>) = expr
.into_iter()
.map(|sort| (sort.expr, (sort.asc, sort.nulls_first)))
.unzip();
let new_sort = self
.try_unary_plan(sort_expressions, input, config)?
.update_data(|(new_expr, new_input)| {
LogicalPlan::Sort(Sort {
expr: replace_sort_expressions(expr, new_expr),
expr: new_expr
.into_iter()
.zip(sort_params)
.map(|(expr, (asc, nulls_first))| SortExpr {
expr,
asc,
nulls_first,
})
.collect(),
input: Arc::new(new_input),
fetch,
})
Expand Down

0 comments on commit 47df15a

Please sign in to comment.