Skip to content

Commit

Permalink
Stop copying plans in LogicalPlan::with_param_values
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Apr 9, 2024
1 parent 57b2656 commit 5743c2a
Showing 1 changed file with 37 additions and 49 deletions.
86 changes: 37 additions & 49 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,14 +1210,19 @@ impl LogicalPlan {
param_values: impl Into<ParamValues>,
) -> Result<LogicalPlan> {
let param_values = param_values.into();
match self {
LogicalPlan::Prepare(prepare_lp) => {
param_values.verify(&prepare_lp.data_types)?;
let input_plan = prepare_lp.input;
input_plan.replace_params_with_values(&param_values)
let plan_with_values = self.replace_params_with_values(&param_values)?;

// unwrap Prepare
Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
param_values.verify(&prepare_lp.data_types)?;
// try and take ownership of the input if is not shared, clone otherwise
match Arc::try_unwrap(prepare_lp.input) {
Ok(input) => input,
Err(arc_input) => arc_input.as_ref().clone(),
}
_ => self.replace_params_with_values(&param_values),
}
} else {
plan_with_values
})
}

/// Returns the maximum number of rows that this plan can output, if known.
Expand Down Expand Up @@ -1528,25 +1533,35 @@ impl LogicalPlan {
///
/// See [`Self::with_param_values`] for examples and usage
pub fn replace_params_with_values(
&self,
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
let new_exprs = self
.expressions()
.into_iter()
.map(|e| {
let e = e.infer_placeholder_types(self.schema())?;
Self::replace_placeholders_with_values(e, param_values)
})
.collect::<Result<Vec<_>>>()?;
struct ParamRewriter<'a> {
param_values: &'a ParamValues,
}

let new_inputs_with_values = self
.inputs()
.into_iter()
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;
impl<'a> TreeNodeRewriter for ParamRewriter<'a> {
type Node = LogicalPlan;

fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
let schema = node.schema().clone();
node.map_expressions(|e| {
let e = e.infer_placeholder_types(&schema)?;
if let Expr::Placeholder(Placeholder { id, .. }) = e {
let value =
self.param_values.get_placeholders_with_values(&id)?;
// Replace the placeholder with the value
Ok(Transformed::yes(Expr::Literal(value)))
} else {
Ok(Transformed::no(e))
}
})
}
}

self.with_new_exprs(new_exprs, new_inputs_with_values)
Ok(self
.rewrite_with_subqueries(&mut ParamRewriter { param_values })?
.data)
}

/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
Expand Down Expand Up @@ -1578,33 +1593,6 @@ impl LogicalPlan {
})
.map(|_| param_types)
}

/// Return an Expr with all placeholders replaced with their
/// corresponding values provided in the params_values
fn replace_placeholders_with_values(
expr: Expr,
param_values: &ParamValues,
) -> Result<Expr> {
expr.transform(&|expr| {
match &expr {
Expr::Placeholder(Placeholder { id, .. }) => {
let value = param_values.get_placeholders_with_values(id)?;
// Replace the placeholder with the value
Ok(Transformed::yes(Expr::Literal(value)))
}
Expr::ScalarSubquery(qry) => {
let subquery =
Arc::new(qry.subquery.replace_params_with_values(param_values)?);
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns: qry.outer_ref_columns.clone(),
})))
}
_ => Ok(Transformed::no(expr)),
}
})
.data()
}
}

// Various implementations for printing out LogicalPlans
Expand Down

0 comments on commit 5743c2a

Please sign in to comment.