Skip to content

Commit

Permalink
fix: schema error when parsing order-by expressions (#10234)
Browse files Browse the repository at this point in the history
* fix: schema error when parsing order-by expressions

* add test from issue

* improve order_by_to_sort_expr

* add test

* Update datafusion/sqllogictest/test_files/order.slt

Co-authored-by: Andrew Lamb <[email protected]>

* fix tests

* add test

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
jonahgao and alamb authored May 1, 2024
1 parent 39d9323 commit d3237b2
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 50 deletions.
19 changes: 15 additions & 4 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context,
// Numeric literals in window function ORDER BY are treated as constants
false,
None,
)?;

let func_deps = schema.functional_dependencies();
Expand Down Expand Up @@ -219,8 +220,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?;
let order_by = self.order_by_to_sort_expr(
&order_by,
schema,
planner_context,
true,
None,
)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
// TODO: Support filter and distinct for UDAFs
Expand All @@ -236,8 +242,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?;
let order_by = self.order_by_to_sort_expr(
&order_by,
schema,
planner_context,
true,
None,
)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
let filter: Option<Box<Expr>> = filter
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema,
planner_context,
true,
None,
)?)
} else {
None
Expand Down
43 changes: 36 additions & 7 deletions datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,39 @@ use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Convert sql [OrderByExpr] to `Vec<Expr>`.
///
/// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) as a 1 based index
/// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`).
/// `input_schema` and `additional_schema` are used to resolve column references in the order-by expressions.
/// `input_schema` is the schema of the input logical plan, typically derived from the SELECT list.
///
/// Usually order-by expressions can only reference the input plan's columns.
/// But the `SELECT ... FROM ... ORDER BY ...` syntax is a special case. Besides the input schema,
/// it can reference an `additional_schema` derived from the `FROM` clause.
///
/// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) as a 1 based index into the
/// SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`). Literals only reference the `input_schema`.
///
/// If false, interpret numeric literals as constant values.
pub(crate) fn order_by_to_sort_expr(
&self,
exprs: &[OrderByExpr],
schema: &DFSchema,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
literal_to_column: bool,
additional_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
if exprs.is_empty() {
return Ok(vec![]);
}

let mut combined_schema;
let order_by_schema = match additional_schema {
Some(schema) => {
combined_schema = input_schema.clone();
combined_schema.merge(schema);
&combined_schema
}
None => input_schema,
};

let mut expr_vec = vec![];
for e in exprs {
let OrderByExpr {
Expand All @@ -52,17 +75,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return plan_err!(
"Order by index starts at 1 for column indexes"
);
} else if schema.fields().len() < field_index {
} else if input_schema.fields().len() < field_index {
return plan_err!(
"Order by column out of bounds, specified: {}, max: {}",
field_index,
schema.fields().len()
input_schema.fields().len()
);
}

Expr::Column(Column::from(schema.qualified_field(field_index - 1)))
Expr::Column(Column::from(
input_schema.qualified_field(field_index - 1),
))
}
e => self.sql_expr_to_logical_expr(e.clone(), schema, planner_context)?,
e => self.sql_expr_to_logical_expr(
e.clone(),
order_by_schema,
planner_context,
)?,
};
let asc = asc.unwrap_or(true);
expr_vec.push(Expr::Sort(Sort::new(
Expand Down
79 changes: 51 additions & 28 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_expr::{
Operator,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, Value,
Expr as SQLExpr, Offset as SQLOffset, Query, SelectInto, SetExpr, Value,
};

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand All @@ -46,29 +46,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let mut set_expr = query.body;
if let Some(with) = query.with {
self.plan_with_clause(with, planner_context)?;
}
// Take the `SelectInto` for later processing.
let select_into = match set_expr.as_mut() {
SetExpr::Select(select) => select.into.take(),
_ => None,
};
let plan = self.set_expr_to_plan(*set_expr, planner_context)?;
let plan = self.order_by(plan, query.order_by, planner_context)?;
let mut plan = self.limit(plan, query.offset, query.limit)?;
if let Some(into) = select_into {
plan = LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
column_defaults: vec![],
}))

let set_expr = *query.body;
match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
// Order-by expressions may refer to columns in the `FROM` clause,
// so we need to process `SELECT` and `ORDER BY` together.
let plan =
self.select_to_plan(*select, query.order_by, planner_context)?;
let plan = self.limit(plan, query.offset, query.limit)?;
// Process the `SELECT INTO` after `LIMIT`.
self.select_into(plan, select_into)
}
other => {
let plan = self.set_expr_to_plan(other, planner_context)?;
let order_by_rex = self.order_by_to_sort_expr(
&query.order_by,
plan.schema(),
planner_context,
true,
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.offset, query.limit)
}
}
Ok(plan)
}

/// Wrap a plan in a limit
Expand Down Expand Up @@ -114,26 +120,43 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

/// Wrap the logical in a sort
fn order_by(
pub(super) fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
order_by: Vec<Expr>,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}

let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?;

if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
// In case of `DISTINCT ON` we must capture the sort expressions since during the plan
// optimization we're effectively doing a `first_value` aggregation according to them.
let distinct_on = distinct_on.clone().with_sort_expr(order_by_rex)?;
let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else {
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
LogicalPlanBuilder::from(plan).sort(order_by)?.build()
}
}

/// Wrap the logical plan in a `SelectInto`
fn select_into(
&self,
plan: LogicalPlan,
select_into: Option<SelectInto>,
) -> Result<LogicalPlan> {
match select_into {
Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
column_defaults: vec![],
},
))),
_ => Ok(plan),
}
}
}
Expand Down
24 changes: 19 additions & 5 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::{Alias, Unnest};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols,
};
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
Expand All @@ -39,8 +39,8 @@ use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, ReplaceSelectItem, WildcardAdditionalOptions,
WindowType,
Distinct, Expr as SQLExpr, GroupByExpr, OrderByExpr, ReplaceSelectItem,
WildcardAdditionalOptions, WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

Expand All @@ -49,6 +49,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn select_to_plan(
&self,
mut select: Select,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
// check for unsupported syntax first
Expand Down Expand Up @@ -94,6 +95,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut combined_schema = base_plan.schema().as_ref().clone();
combined_schema.merge(projected_plan.schema());

// Order-by expressions prioritize referencing columns from the select list,
// then from the FROM clause.
let order_by_rex = self.order_by_to_sort_expr(
&order_by,
projected_plan.schema().as_ref(),
planner_context,
true,
Some(base_plan.schema().as_ref()),
)?;
let order_by_rex = normalize_cols(order_by_rex, &projected_plan)?;

// this alias map is resolved and looked up in both having exprs and group by exprs
let alias_map = extract_aliases(&select_exprs);

Expand Down Expand Up @@ -248,9 +260,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<_>>>()?;

// Build the final plan
return LogicalPlanBuilder::from(base_plan)
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_expr, select_exprs, None)?
.build();
.build()
}
}?;

Expand All @@ -274,6 +286,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan
};

let plan = self.order_by(plan, order_by_rex)?;

Ok(plan)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, planner_context),
SetExpr::Select(s) => self.select_to_plan(*s, vec![], planner_context),
SetExpr::Values(v) => self.sql_values_to_plan(v, planner_context),
SetExpr::SetOperation {
op,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
let expr_vec =
self.order_by_to_sort_expr(&expr, schema, planner_context, true)?;
self.order_by_to_sort_expr(&expr, schema, planner_context, true, None)?;
// Verify that columns of all SortExprs exist in the schema:
for expr in expr_vec.iter() {
for column in expr.to_columns()?.iter() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3646,7 +3646,7 @@ fn test_select_distinct_order_by() {
let sql = "SELECT distinct '1' from person order by id";

let expected =
"Error during planning: For SELECT DISTINCT, ORDER BY expressions id must appear in select list";
"Error during planning: For SELECT DISTINCT, ORDER BY expressions person.id must appear in select list";

// It should return error.
let result = logical_plan(sql);
Expand Down
Loading

0 comments on commit d3237b2

Please sign in to comment.