diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d16dfb140353..8943ae237afd 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -913,14 +913,19 @@ impl LogicalPlan { param_values: impl Into, ) -> Result { let param_values = param_values.into(); - match self { - LogicalPlan::Prepare(prepare_lp) => { - param_values.verify(&prepare_lp.data_types)?; - let input_plan = prepare_lp.input; - input_plan.replace_params_with_values(¶m_values) + let plan_with_values = self.replace_params_with_values(¶m_values)?; + + // unwrap Prepare + Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values { + param_values.verify(&prepare_lp.data_types)?; + // try and take ownership of the input if is not shared, clone otherwise + match Arc::try_unwrap(prepare_lp.input) { + Ok(input) => input, + Err(arc_input) => arc_input.as_ref().clone(), } - _ => self.replace_params_with_values(¶m_values), - } + } else { + plan_with_values + }) } /// Returns the maximum number of rows that this plan can output, if known. @@ -1046,27 +1051,26 @@ impl LogicalPlan { /// ...) replaced with corresponding values provided in /// `params_values` /// - /// See [`Self::with_param_values`] for examples and usage + /// See [`Self::with_param_values`] for examples and usage with an owned + /// `ParamValues` pub fn replace_params_with_values( - &self, + self, param_values: &ParamValues, ) -> Result { - let new_exprs = self - .expressions() - .into_iter() - .map(|e| { - let e = e.infer_placeholder_types(self.schema())?; - Self::replace_placeholders_with_values(e, param_values) + self.transform_up_with_subqueries(&|plan| { + let schema = plan.schema().clone(); + plan.map_expressions(|e| { + e.infer_placeholder_types(&schema)?.transform_up(&|e| { + if let Expr::Placeholder(Placeholder { id, .. }) = e { + let value = param_values.get_placeholders_with_values(&id)?; + Ok(Transformed::yes(Expr::Literal(value))) + } else { + Ok(Transformed::no(e)) + } + }) }) - .collect::>>()?; - - let new_inputs_with_values = self - .inputs() - .into_iter() - .map(|inp| inp.replace_params_with_values(param_values)) - .collect::>>()?; - - self.with_new_exprs(new_exprs, new_inputs_with_values) + }) + .map(|res| res.data) } /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes @@ -1099,33 +1103,6 @@ impl LogicalPlan { .map(|_| param_types) } - /// Return an Expr with all placeholders replaced with their - /// corresponding values provided in the params_values - fn replace_placeholders_with_values( - expr: Expr, - param_values: &ParamValues, - ) -> Result { - expr.transform(&|expr| { - match &expr { - Expr::Placeholder(Placeholder { id, .. }) => { - let value = param_values.get_placeholders_with_values(id)?; - // Replace the placeholder with the value - Ok(Transformed::yes(Expr::Literal(value))) - } - Expr::ScalarSubquery(qry) => { - let subquery = - Arc::new(qry.subquery.replace_params_with_values(param_values)?); - Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { - subquery, - outer_ref_columns: qry.outer_ref_columns.clone(), - }))) - } - _ => Ok(Transformed::no(expr)), - } - }) - .data() - } - // ------------ // Various implementations for printing out LogicalPlans // ------------