Skip to content

Commit

Permalink
fix: RANGE frame for corner cases with empty ORDER BY clause should b…
Browse files Browse the repository at this point in the history
…e treated as constant sort (#8445)

* fix: RANGE frame for corner cases with empty ORDER BY clause should be treated as constant sort

* fix

* Make the test not flaky

* fix clippy
  • Loading branch information
viirya authored Dec 8, 2023
1 parent c0c9e88 commit 205e315
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 34 deletions.
56 changes: 37 additions & 19 deletions datafusion/expr/src/window_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
//! - An ending frame boundary,
//! - An EXCLUDE clause.
use crate::expr::Sort;
use crate::Expr;
use datafusion_common::{plan_err, sql_err, DataFusionError, Result, ScalarValue};
use sqlparser::ast;
use sqlparser::parser::ParserError::ParserError;
Expand Down Expand Up @@ -142,41 +144,57 @@ impl WindowFrame {
}
}

/// Construct equivalent explicit window frames for implicit corner cases.
/// With this processing, we may assume in downstream code that RANGE/GROUPS
/// frames contain an appropriate ORDER BY clause.
pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result<WindowFrame> {
if frame.units == WindowFrameUnits::Range && order_bys != 1 {
/// Regularizes ORDER BY clause for window definition for implicit corner cases.
pub fn regularize_window_order_by(
frame: &WindowFrame,
order_by: &mut Vec<Expr>,
) -> Result<()> {
if frame.units == WindowFrameUnits::Range && order_by.len() != 1 {
// Normally, RANGE frames require an ORDER BY clause with exactly one
// column. However, an ORDER BY clause may be absent or present but with
// more than one column in two edge cases:
// 1. start bound is UNBOUNDED or CURRENT ROW
// 2. end bound is CURRENT ROW or UNBOUNDED.
// In these cases, we regularize the RANGE frame to be equivalent to a ROWS
// frame with the UNBOUNDED bounds.
// Note that this follows Postgres behavior.
// In these cases, we regularize the ORDER BY clause if the ORDER BY clause
// is absent. If an ORDER BY clause is present but has more than one column,
// the ORDER BY clause is unchanged. Note that this follows Postgres behavior.
if (frame.start_bound.is_unbounded()
|| frame.start_bound == WindowFrameBound::CurrentRow)
&& (frame.end_bound == WindowFrameBound::CurrentRow
|| frame.end_bound.is_unbounded())
{
// If an ORDER BY clause is absent, the frame is equivalent to a ROWS
// frame with the UNBOUNDED bounds.
// If an ORDER BY clause is present but has more than one column, the
// frame is unchanged.
if order_bys == 0 {
frame.units = WindowFrameUnits::Rows;
frame.start_bound =
WindowFrameBound::Preceding(ScalarValue::UInt64(None));
frame.end_bound = WindowFrameBound::Following(ScalarValue::UInt64(None));
// If an ORDER BY clause is absent, it is equivalent to a ORDER BY clause
// with constant value as sort key.
// If an ORDER BY clause is present but has more than one column, it is
// unchanged.
if order_by.is_empty() {
order_by.push(Expr::Sort(Sort::new(
Box::new(Expr::Literal(ScalarValue::UInt64(Some(1)))),
true,
false,
)));
}
} else {
}
}
Ok(())
}

/// Checks if given window frame is valid. In particular, if the frame is RANGE
/// with offset PRECEDING/FOLLOWING, it must have exactly one ORDER BY column.
pub fn check_window_frame(frame: &WindowFrame, order_bys: usize) -> Result<()> {
if frame.units == WindowFrameUnits::Range && order_bys != 1 {
// See `regularize_window_order_by`.
if !(frame.start_bound.is_unbounded()
|| frame.start_bound == WindowFrameBound::CurrentRow)
|| !(frame.end_bound == WindowFrameBound::CurrentRow
|| frame.end_bound.is_unbounded())
{
plan_err!("RANGE requires exactly one ORDER BY column")?
}
} else if frame.units == WindowFrameUnits::Groups && order_bys == 0 {
plan_err!("GROUPS requires an ORDER BY clause")?
};
Ok(frame)
Ok(())
}

/// There are five ways to describe starting and ending frame boundaries:
Expand Down
8 changes: 5 additions & 3 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion_common::{
internal_err, plan_datafusion_err, Column, Constraint, Constraints, DFField,
DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue,
};
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims, array_element,
array_except, array_has, array_has_all, array_has_any, array_intersect, array_length,
Expand All @@ -59,7 +60,6 @@ use datafusion_expr::{
sqrt, starts_with, string_to_array, strpos, struct_fun, substr, substr_index,
substring, tan, tanh, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_nanos, to_timestamp_seconds, translate, trim, trunc, upper, uuid,
window_frame::regularize,
AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction,
Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
GroupingSet::GroupingSets,
Expand Down Expand Up @@ -1072,7 +1072,7 @@ pub fn parse_expr(
.iter()
.map(|e| parse_expr(e, registry))
.collect::<Result<Vec<_>, _>>()?;
let order_by = expr
let mut order_by = expr
.order_by
.iter()
.map(|e| parse_expr(e, registry))
Expand All @@ -1082,14 +1082,16 @@ pub fn parse_expr(
.as_ref()
.map::<Result<WindowFrame, _>, _>(|window_frame| {
let window_frame = window_frame.clone().try_into()?;
regularize(window_frame, order_by.len())
check_window_frame(&window_frame, order_by.len())
.map(|_| window_frame)
})
.transpose()?
.ok_or_else(|| {
DataFusionError::Execution(
"missing window frame during deserialization".to_string(),
)
})?;
regularize_window_order_by(&window_frame, &mut order_by)?;

match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
Expand Down
10 changes: 7 additions & 3 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion_common::{
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::function::suggest_valid_function;
use datafusion_expr::window_frame::regularize;
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
use datafusion_expr::{
expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
WindowFunction,
Expand Down Expand Up @@ -92,7 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let order_by = self.order_by_to_sort_expr(
let mut order_by = self.order_by_to_sort_expr(
&window.order_by,
schema,
planner_context,
Expand All @@ -104,14 +104,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.as_ref()
.map(|window_frame| {
let window_frame = window_frame.clone().try_into()?;
regularize(window_frame, order_by.len())
check_window_frame(&window_frame, order_by.len())
.map(|_| window_frame)
})
.transpose()?;

let window_frame = if let Some(window_frame) = window_frame {
regularize_window_order_by(&window_frame, &mut order_by)?;
window_frame
} else {
WindowFrame::new(!order_by.is_empty())
};

if let Ok(fun) = self.find_window_func(&name) {
let expr = match fun {
WindowFunction::AggregateFunction(aggregate_fun) => {
Expand Down
16 changes: 7 additions & 9 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3763,15 +3763,13 @@ select a,
1 1
2 2

# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
# Comment it because it is flaky now as it depends on the order of the `a` column.
# query II
# select a,
# rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
# from (select 1 a union select 2 a) q ORDER BY rnk
# ----
# 1 1
# 2 2
query II
select a,
rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q ORDER BY a
----
1 1
2 1

# TODO: this works in Postgres which returns [1, 1].
query error DataFusion error: Arrow error: Invalid argument error: must either specify a row count or at least one column
Expand Down

0 comments on commit 205e315

Please sign in to comment.