Skip to content

Commit

Permalink
Remove some recursive clones (#9050)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak authored Jan 30, 2024
1 parent 488cfe1 commit ee9736f
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 186 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl LogicalPlanBuilder {
)
})
.collect::<Result<Vec<_>>>()?;
curr_plan.with_new_exprs(curr_plan.expressions(), &new_inputs)
curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
}
}
}
Expand Down
74 changes: 37 additions & 37 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ impl LogicalPlan {
/// Returns a copy of this `LogicalPlan` with the new inputs
#[deprecated(since = "35.0.0", note = "please use `with_new_exprs` instead")]
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> Result<LogicalPlan> {
self.with_new_exprs(self.expressions(), inputs)
self.with_new_exprs(self.expressions(), inputs.to_vec())
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
Expand All @@ -590,13 +590,13 @@ impl LogicalPlan {
pub fn with_new_exprs(
&self,
mut expr: Vec<Expr>,
inputs: &[LogicalPlan],
mut inputs: Vec<LogicalPlan>,
) -> Result<LogicalPlan> {
match self {
// Since expr may be different than the previous expr, schema of the projection
// may change. We need to use try_new method instead of try_new_with_schema method.
LogicalPlan::Projection(Projection { .. }) => {
Projection::try_new(expr, Arc::new(inputs[0].clone()))
Projection::try_new(expr, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Projection)
}
LogicalPlan::Dml(DmlStatement {
Expand All @@ -608,7 +608,7 @@ impl LogicalPlan {
table_name: table_name.clone(),
table_schema: table_schema.clone(),
op: op.clone(),
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
})),
LogicalPlan::Copy(CopyTo {
input: _,
Expand All @@ -617,7 +617,7 @@ impl LogicalPlan {
copy_options,
single_file_output,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),
single_file_output: *single_file_output,
Expand All @@ -629,7 +629,7 @@ impl LogicalPlan {
values: expr
.chunks_exact(schema.fields().len())
.map(|s| s.to_vec())
.collect::<Vec<_>>(),
.collect(),
}))
}
LogicalPlan::Filter { .. } => {
Expand Down Expand Up @@ -674,7 +674,7 @@ impl LogicalPlan {
let mut remove_aliases = RemoveAliases {};
let predicate = predicate.rewrite(&mut remove_aliases)?;

Filter::try_new(predicate, Arc::new(inputs[0].clone()))
Filter::try_new(predicate, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Filter)
}
LogicalPlan::Repartition(Repartition {
Expand All @@ -684,35 +684,35 @@ impl LogicalPlan {
Partitioning::RoundRobinBatch(n) => {
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::RoundRobinBatch(*n),
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
}))
}
Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::Hash(expr, *n),
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
})),
Partitioning::DistributeBy(_) => {
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::DistributeBy(expr),
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
}))
}
},
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_eq!(window_expr.len(), expr.len());
Window::try_new(expr, Arc::new(inputs[0].clone()))
Window::try_new(expr, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Window)
}
LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
// group exprs are the first expressions
let agg_expr = expr.split_off(group_expr.len());

Aggregate::try_new(Arc::new(inputs[0].clone()), expr, agg_expr)
Aggregate::try_new(Arc::new(inputs.swap_remove(0)), expr, agg_expr)
.map(LogicalPlan::Aggregate)
}
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(LogicalPlan::Sort(Sort {
expr,
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
fetch: *fetch,
})),
LogicalPlan::Join(Join {
Expand All @@ -739,7 +739,7 @@ impl LogicalPlan {
// The first part of expr is equi-exprs,
// and the struct of each equi-expr is like `left-expr = right-expr`.
assert_eq!(expr.len(), equi_expr_count);
let new_on:Vec<(Expr,Expr)> = expr.into_iter().map(|equi_expr| {
let new_on = expr.into_iter().map(|equi_expr| {
// SimplifyExpression rule may add alias to the equi_expr.
let unalias_expr = equi_expr.clone().unalias();
if let Expr::BinaryExpr(BinaryExpr { left, op: Operator::Eq, right }) = unalias_expr {
Expand All @@ -752,8 +752,8 @@ impl LogicalPlan {
}).collect::<Result<Vec<(Expr, Expr)>>>()?;

Ok(LogicalPlan::Join(Join {
left: Arc::new(inputs[0].clone()),
right: Arc::new(inputs[1].clone()),
left: Arc::new(inputs.swap_remove(0)),
right: Arc::new(inputs.swap_remove(0)),
join_type: *join_type,
join_constraint: *join_constraint,
on: new_on,
Expand All @@ -763,28 +763,28 @@ impl LogicalPlan {
}))
}
LogicalPlan::CrossJoin(_) => {
let left = inputs[0].clone();
let right = inputs[1].clone();
let left = inputs.swap_remove(0);
let right = inputs.swap_remove(0);
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlan::Subquery(Subquery {
outer_ref_columns, ..
}) => {
let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?;
let subquery = LogicalPlanBuilder::from(inputs.swap_remove(0)).build()?;
Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(subquery),
outer_ref_columns: outer_ref_columns.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
SubqueryAlias::try_new(Arc::new(inputs[0].clone()), alias.clone())
SubqueryAlias::try_new(Arc::new(inputs.swap_remove(0)), alias.clone())
.map(LogicalPlan::SubqueryAlias)
}
LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
Ok(LogicalPlan::Limit(Limit {
skip: *skip,
fetch: *fetch,
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
}))
}
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
Expand All @@ -795,7 +795,7 @@ impl LogicalPlan {
..
})) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
constraints: Constraints::empty(),
name: name.clone(),
if_not_exists: *if_not_exists,
Expand All @@ -809,30 +809,30 @@ impl LogicalPlan {
definition,
..
})) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
name: name.clone(),
or_replace: *or_replace,
definition: definition.clone(),
}))),
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(&expr, inputs),
node: e.node.from_template(&expr, &inputs),
})),
LogicalPlan::Union(Union { schema, .. }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema.
let schema = if schema.fields().len() == input_schema.fields().len() {
schema
schema.clone()
} else {
input_schema
input_schema.clone()
};
Ok(LogicalPlan::Union(Union {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
inputs: inputs.into_iter().map(Arc::new).collect(),
schema,
}))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Distinct::All(_) => Distinct::All(Arc::new(inputs.swap_remove(0))),
Distinct::On(DistinctOn {
on_expr,
select_expr,
Expand All @@ -848,7 +848,7 @@ impl LogicalPlan {
} else {
None
},
Arc::new(inputs[0].clone()),
Arc::new(inputs.swap_remove(0)),
)?)
}
};
Expand All @@ -858,8 +858,8 @@ impl LogicalPlan {
name, is_distinct, ..
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
name: name.clone(),
static_term: Arc::new(inputs[0].clone()),
recursive_term: Arc::new(inputs[1].clone()),
static_term: Arc::new(inputs.swap_remove(0)),
recursive_term: Arc::new(inputs.swap_remove(0)),
is_distinct: *is_distinct,
})),
LogicalPlan::Analyze(a) => {
Expand All @@ -868,7 +868,7 @@ impl LogicalPlan {
Ok(LogicalPlan::Analyze(Analyze {
verbose: a.verbose,
schema: a.schema.clone(),
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
}))
}
LogicalPlan::Explain(e) => {
Expand All @@ -879,7 +879,7 @@ impl LogicalPlan {
assert_eq!(inputs.len(), 1, "Invalid EXPLAIN command. Inputs are empty");
Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: Arc::new(inputs[0].clone()),
plan: Arc::new(inputs.swap_remove(0)),
stringified_plans: e.stringified_plans.clone(),
schema: e.schema.clone(),
logical_optimization_succeeded: e.logical_optimization_succeeded,
Expand All @@ -890,7 +890,7 @@ impl LogicalPlan {
}) => Ok(LogicalPlan::Prepare(Prepare {
name: name.clone(),
data_types: data_types.clone(),
input: Arc::new(inputs[0].clone()),
input: Arc::new(inputs.swap_remove(0)),
})),
LogicalPlan::TableScan(ts) => {
assert!(inputs.is_empty(), "{self:?} should have no inputs");
Expand All @@ -915,7 +915,7 @@ impl LogicalPlan {
..
}) => {
// Update schema with unnested column type.
let input = Arc::new(inputs[0].clone());
let input = Arc::new(inputs.swap_remove(0));
let nested_field = input.schema().field_from_column(column)?;
let unnested_field = schema.field_from_column(column)?;
let fields = input
Expand Down Expand Up @@ -1199,7 +1199,7 @@ impl LogicalPlan {
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;

self.with_new_exprs(new_exprs, &new_inputs_with_values)
self.with_new_exprs(new_exprs, new_inputs_with_values)
}

/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ impl TreeNode for LogicalPlan {

// if any changes made, make a new child
if old_children
.iter()
.into_iter()
.zip(new_children.iter())
.any(|(c1, c2)| c1 != &c2)
.any(|(c1, c2)| c1 != c2)
{
self.with_new_exprs(self.expressions(), new_children.as_slice())
self.with_new_exprs(self.expressions(), new_children)
} else {
Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ pub fn from_plan(
expr: &[Expr],
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
plan.with_new_exprs(expr.to_vec(), inputs)
plan.with_new_exprs(expr.to_vec(), inputs.to_vec())
}

/// Find all columns referenced from an aggregate query
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/analyzer/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn analyze_internal(plan: &LogicalPlan) -> Result<LogicalPlan> {
})
.collect::<Result<Vec<_>>>()?;

plan.with_new_exprs(new_expr, &new_inputs)
plan.with_new_exprs(new_expr, new_inputs)
}

pub(crate) struct OperatorToFunctionRewriter {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn analyze_internal(
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
// optimize child plans first
let new_inputs = plan
let mut new_inputs = plan
.inputs()
.iter()
.map(|p| analyze_internal(external_schema, p))
Expand Down Expand Up @@ -115,9 +115,9 @@ fn analyze_internal(
match &plan {
LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new(
new_expr,
Arc::new(new_inputs[0].clone()),
Arc::new(new_inputs.swap_remove(0)),
)?)),
_ => plan.with_new_exprs(new_expr, &new_inputs),
_ => plan.with_new_exprs(new_expr, new_inputs),
}
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ impl OptimizerRule for EliminateOuterJoin {
schema: join.schema.clone(),
null_equals_null: join.null_equals_null,
});
let new_plan =
plan.with_new_exprs(plan.expressions(), &[new_join])?;
Ok(Some(new_plan))
let exprs = plan.expressions();
plan.with_new_exprs(exprs, vec![new_join]).map(Some)
}
_ => Ok(None),
},
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ fn optimize_projections(
// If new_input is `None`, this means child is not changed, so use
// `old_child` during construction:
.map(|(new_input, old_child)| new_input.unwrap_or_else(|| old_child.clone()))
.collect::<Vec<_>>();
plan.with_new_exprs(plan.expressions(), &new_inputs)
.map(Some)
.collect();
let exprs = plan.expressions();
plan.with_new_exprs(exprs, new_inputs).map(Some)
}
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,10 @@ impl Optimizer {
Some(plan) => plan,
None => old_plan.clone(),
})
.collect::<Vec<_>>();
.collect();

Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
let exprs = plan.expressions();
plan.with_new_exprs(exprs, new_inputs).map(Some)
}

/// Use a rule to optimize the whole plan.
Expand Down
Loading

0 comments on commit ee9736f

Please sign in to comment.