Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce user defined SQL planner API #11180

Merged
merged 19 commits into from
Jul 2, 2024
Merged
21 changes: 14 additions & 7 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
@@ -114,8 +114,12 @@ pub trait UserDefinedSQLPlanner {

/// An operator with two arguments to plan
///
/// Note `left` and `right` are DataFusion [`Expr`]s but the `op` is the SQL AST operator.
/// This structure is used by [`UserDefinedSQLPlanner`] to plan operators with custom expressions.
/// Note `left` and `right` are DataFusion [`Expr`]s but the `op` is the SQL AST
/// operator.
///
/// This structure is used by [`UserDefinedSQLPlanner`] to plan operators with
/// custom expressions.
#[derive(Debug, Clone)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need Clone 🤔 Same as RawFieldAccessExpr and PlannerResult . Others looks good to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree there isn't a critical usecase for it now, but I figured it didn't hurt. If you feel strongly I will remove Clone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not a critical issue.

pub struct RawBinaryExpr {
pub op: sqlparser::ast::BinaryOperator,
pub left: Expr,
@@ -124,16 +128,19 @@ pub struct RawBinaryExpr {

/// An expression with GetFieldAccess to plan
///
/// This structure is used by [`UserDefinedSQLPlanner`] to plan operators with custom expressions.
/// This structure is used by [`UserDefinedSQLPlanner`] to plan operators with
/// custom expressions.
#[derive(Debug, Clone)]
pub struct RawFieldAccessExpr {
pub field_access: GetFieldAccess,
pub expr: Expr,
}

/// Result of planning a raw expr with [`UserDefinedSQLPlanner`]
#[derive(Debug, Clone)]
pub enum PlannerResult<T> {
/// The function call was simplified to an entirely new Expr
Simplified(Expr),
/// the function call could not be simplified, and the arguments
/// are return unmodified.
/// The raw expression was successfully planned as a new [`Expr`]
Planned(Expr),
/// The raw expression could not be planned, and is returned unmodified
Original(T),
}
20 changes: 10 additions & 10 deletions datafusion/functions-array/src/planner.rs
Original file line number Diff line number Diff line change
@@ -60,11 +60,11 @@ impl UserDefinedSQLPlanner for ArrayFunctionPlanner {
// TODO: concat function ignore null, but string concat takes null into consideration
// we can rewrite it to concat if we can configure the behaviour of concat function to the one like `string concat operator`
} else if left_list_ndims == right_list_ndims {
return Ok(PlannerResult::Simplified(array_concat(vec![left, right])));
return Ok(PlannerResult::Planned(array_concat(vec![left, right])));
} else if left_list_ndims > right_list_ndims {
return Ok(PlannerResult::Simplified(array_append(left, right)));
return Ok(PlannerResult::Planned(array_append(left, right)));
} else if left_list_ndims < right_list_ndims {
return Ok(PlannerResult::Simplified(array_prepend(left, right)));
return Ok(PlannerResult::Planned(array_prepend(left, right)));
}
} else if matches!(
op,
@@ -79,10 +79,10 @@ impl UserDefinedSQLPlanner for ArrayFunctionPlanner {
if left_list_ndims > 0 && right_list_ndims > 0 {
if op == sqlparser::ast::BinaryOperator::AtArrow {
// array1 @> array2 -> array_has_all(array1, array2)
return Ok(PlannerResult::Simplified(array_has_all(left, right)));
return Ok(PlannerResult::Planned(array_has_all(left, right)));
} else {
// array1 <@ array2 -> array_has_all(array2, array1)
return Ok(PlannerResult::Simplified(array_has_all(right, left)));
return Ok(PlannerResult::Planned(array_has_all(right, left)));
}
}
}
@@ -95,7 +95,7 @@ impl UserDefinedSQLPlanner for ArrayFunctionPlanner {
exprs: Vec<Expr>,
_schema: &DFSchema,
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Simplified(make_array(exprs)))
Ok(PlannerResult::Planned(make_array(exprs)))
}
}

@@ -113,14 +113,14 @@ impl UserDefinedSQLPlanner for FieldAccessPlanner {
match field_access {
// expr["field"] => get_field(expr, "field")
GetFieldAccess::NamedStructField { name } => {
Ok(PlannerResult::Simplified(get_field(expr, name)))
Ok(PlannerResult::Planned(get_field(expr, name)))
}
// expr[idx] ==> array_element(expr, idx)
GetFieldAccess::ListIndex { key: index } => {
match expr {
// Special case for array_agg(expr)[index] to NTH_VALUE(expr, index)
Expr::AggregateFunction(agg_func) if is_array_agg(&agg_func) => {
Ok(PlannerResult::Simplified(Expr::AggregateFunction(
Ok(PlannerResult::Planned(Expr::AggregateFunction(
datafusion_expr::expr::AggregateFunction::new(
AggregateFunction::NthValue,
agg_func
@@ -135,15 +135,15 @@ impl UserDefinedSQLPlanner for FieldAccessPlanner {
),
)))
}
_ => Ok(PlannerResult::Simplified(array_element(expr, *index))),
_ => Ok(PlannerResult::Planned(array_element(expr, *index))),
}
}
// expr[start, stop, stride] ==> array_slice(expr, start, stop, stride)
GetFieldAccess::ListRange {
start,
stop,
stride,
} => Ok(PlannerResult::Simplified(array_slice(
} => Ok(PlannerResult::Planned(array_slice(
expr,
*start,
*stop,
6 changes: 3 additions & 3 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
@@ -109,7 +109,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut binary_expr = datafusion_expr::planner::RawBinaryExpr { op, left, right };
for planner in self.planners.iter() {
match planner.plan_binary_op(binary_expr, schema)? {
PlannerResult::Simplified(expr) => {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(expr) => {
@@ -281,14 +281,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
for planner in self.planners.iter() {
match planner.plan_field_access(field_access_expr, schema)? {
PlannerResult::Simplified(expr) => return Ok(expr),
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(expr) => {
field_access_expr = expr;
}
}
}

internal_err!("Expected a simplified result, but none was found")
not_impl_err!("GetFieldAccess not supported by UserDefinedExtensionPlanners: {field_access_expr:?}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 I'm curious about this default branch because I'm seeing this error when doing integration testing with the delta-rs package. To the best of my knowledge we're not using any extension planners, but now this is failing with the latest datafusion. I'm uncertain whether we're doing something wrong, or if the default behavior of falling through to not_impl_err is causing us grief

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rtyler -- looks to me like the delta.rs code is managing its own SessionState -- like in https://github.com/delta-io/delta-rs/blob/main/crates/core/src/delta_datafusion/mod.rs#L1687 (BTW the Delta API is really nicely thought out)

So I think you'll need to register the same planners in your SessionContext 🤔

Helpfully I think @Omega359 just made a PR to make this easier: #11296

I feel in general DataFusion is hard to use / configure correctly if you are using a custom SessionState / configuration -- one potential thing we were discussing is #11182 (comment) -- I'll file a ticket to make this a thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had a similar issue you you @rtyler. Found a workaround described here: #11477

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed fix is here: #11485

}

SQLExpr::CompoundIdentifier(ids) => {
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/value.rs
Original file line number Diff line number Diff line change
@@ -156,7 +156,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut exprs = values;
for planner in self.planners.iter() {
match planner.plan_array_literal(exprs, schema)? {
PlannerResult::Simplified(expr) => {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(values) => exprs = values,