Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop copying plans in LogicalPlan::with_param_values #10016

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 28 additions & 51 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,14 +913,19 @@ impl LogicalPlan {
param_values: impl Into<ParamValues>,
) -> Result<LogicalPlan> {
let param_values = param_values.into();
match self {
LogicalPlan::Prepare(prepare_lp) => {
param_values.verify(&prepare_lp.data_types)?;
let input_plan = prepare_lp.input;
input_plan.replace_params_with_values(&param_values)
let plan_with_values = self.replace_params_with_values(&param_values)?;

// unwrap Prepare
Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
param_values.verify(&prepare_lp.data_types)?;
// try and take ownership of the input if is not shared, clone otherwise
match Arc::try_unwrap(prepare_lp.input) {
Ok(input) => input,
Err(arc_input) => arc_input.as_ref().clone(),
}
_ => self.replace_params_with_values(&param_values),
}
} else {
plan_with_values
})
}

/// Returns the maximum number of rows that this plan can output, if known.
Expand Down Expand Up @@ -1046,27 +1051,26 @@ impl LogicalPlan {
/// ...) replaced with corresponding values provided in
/// `params_values`
///
/// See [`Self::with_param_values`] for examples and usage
/// See [`Self::with_param_values`] for examples and usage with an owned
/// `ParamValues`
pub fn replace_params_with_values(
&self,
self,
Copy link
Contributor Author

@alamb alamb Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the & is the API change -- this allows avoiding a clone

param_values: &ParamValues,
) -> Result<LogicalPlan> {
let new_exprs = self
.expressions()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expressions clones Exprs

.into_iter()
.map(|e| {
let e = e.infer_placeholder_types(self.schema())?;
Self::replace_placeholders_with_values(e, param_values)
self.transform_up_with_subqueries(&|plan| {
let schema = plan.schema().clone();
plan.map_expressions(|e| {
e.infer_placeholder_types(&schema)?.transform_up(&|e| {
if let Expr::Placeholder(Placeholder { id, .. }) = e {
let value = param_values.get_placeholders_with_values(&id)?;
Ok(Transformed::yes(Expr::Literal(value)))
} else {
Ok(Transformed::no(e))
}
})
})
.collect::<Result<Vec<_>>>()?;

let new_inputs_with_values = self
.inputs()
.into_iter()
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;

self.with_new_exprs(new_exprs, new_inputs_with_values)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_with_exprs also copies all the other fields of the plan, so removing it results in fewer copies too

})
.map(|res| res.data)
}

/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
Expand Down Expand Up @@ -1099,33 +1103,6 @@ impl LogicalPlan {
.map(|_| param_types)
}

/// Return an Expr with all placeholders replaced with their
/// corresponding values provided in the params_values
fn replace_placeholders_with_values(
expr: Expr,
param_values: &ParamValues,
) -> Result<Expr> {
expr.transform(&|expr| {
Copy link
Contributor Author

@alamb alamb Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrite_with_subqueries handles recursing into the subqueries so this is no longer necessary

match &expr {
Expr::Placeholder(Placeholder { id, .. }) => {
let value = param_values.get_placeholders_with_values(id)?;
// Replace the placeholder with the value
Ok(Transformed::yes(Expr::Literal(value)))
}
Expr::ScalarSubquery(qry) => {
let subquery =
Arc::new(qry.subquery.replace_params_with_values(param_values)?);
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns: qry.outer_ref_columns.clone(),
})))
}
_ => Ok(Transformed::no(expr)),
}
})
.data()
}

// ------------
// Various implementations for printing out LogicalPlans
// ------------
Expand Down
Loading