Skip to content

Commit

Permalink
Remove Expr clones from SortExprs
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Nov 5, 2024
1 parent 2e52580 commit ed51689
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
11 changes: 9 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use indexmap::IndexSet;

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::tree_node::replace_sort_expressions;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -866,7 +865,15 @@ impl LogicalPlan {
}) => {
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Sort(Sort {
expr: replace_sort_expressions(sort_expr.clone(), expr),
expr: expr
.into_iter()
.zip(sort_expr.iter())
.map(|(expr, sort)| SortExpr {
expr,
asc: sort.asc,
nulls_first: sort.nulls_first,
})
.collect(),
input: Arc::new(input),
fetch: *fetch,
}))
Expand Down
28 changes: 4 additions & 24 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,29 +408,9 @@ pub fn transform_sort_option_vec<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
/// Transforms an vector of sort expressions by applying the provided closure `f`.
pub fn transform_sort_vec<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
sorts: Vec<Sort>,
mut f: &mut F,
f: &mut F,
) -> Result<Transformed<Vec<Sort>>> {
Ok(sorts
.iter()
.map(|sort| sort.expr.clone())
.map_until_stop_and_collect(&mut f)?
.update_data(|transformed_exprs| {
replace_sort_expressions(sorts, transformed_exprs)
}))
}

pub fn replace_sort_expressions(sorts: Vec<Sort>, new_expr: Vec<Expr>) -> Vec<Sort> {
assert_eq!(sorts.len(), new_expr.len());
sorts
.into_iter()
.zip(new_expr)
.map(|(sort, expr)| replace_sort_expression(sort, expr))
.collect()
}

pub fn replace_sort_expression(sort: Sort, new_expr: Expr) -> Sort {
Sort {
expr: new_expr,
..sort
}
sorts.into_iter().map_until_stop_and_collect(|s| {
Ok(f(s.expr)?.update_data(|e| Sort { expr: e, ..s }))
})
}
19 changes: 15 additions & 4 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::logical_plan::{
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
};
use datafusion_expr::tree_node::replace_sort_expressions;
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator};
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator, SortExpr};

const CSE_PREFIX: &str = "__common_expr";

Expand Down Expand Up @@ -91,19 +90,31 @@ impl CommonSubexprEliminate {
.map(LogicalPlan::Projection)
})
}

fn try_optimize_sort(
&self,
sort: Sort,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let Sort { expr, input, fetch } = sort;
let input = Arc::unwrap_or_clone(input);
let sort_expressions = expr.iter().map(|sort| sort.expr.clone()).collect();
let (sort_expressions, sort_params): (Vec<_>, Vec<(_, _)>) = expr
.into_iter()
.map(|sort| (sort.expr, (sort.asc, sort.nulls_first)))
.unzip();
let new_sort = self
.try_unary_plan(sort_expressions, input, config)?
.update_data(|(new_expr, new_input)| {
LogicalPlan::Sort(Sort {
expr: replace_sort_expressions(expr, new_expr),
expr: new_expr
.into_iter()
.zip(sort_params.into_iter())
.map(|(expr, (asc, nulls_first))| SortExpr {
expr,
asc,
nulls_first,
})
.collect(),
input: Arc::new(new_input),
fetch,
})
Expand Down

0 comments on commit ed51689

Please sign in to comment.