Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: RANGE frame for corner cases with empty ORDER BY clause should be treated as constant sort #8445

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions datafusion/expr/src/window_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
//! - An ending frame boundary,
//! - An EXCLUDE clause.
use crate::expr::Sort;
use crate::Expr;
use datafusion_common::{plan_err, sql_err, DataFusionError, Result, ScalarValue};
use sqlparser::ast;
use sqlparser::parser::ParserError::ParserError;
Expand Down Expand Up @@ -142,41 +144,57 @@ impl WindowFrame {
}
}

/// Construct equivalent explicit window frames for implicit corner cases.
/// With this processing, we may assume in downstream code that RANGE/GROUPS
/// frames contain an appropriate ORDER BY clause.
pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result<WindowFrame> {
if frame.units == WindowFrameUnits::Range && order_bys != 1 {
/// Regularizes ORDER BY clause for window definition for implicit corner cases.
pub fn regularize_window_order_by(
frame: &WindowFrame,
order_by: &mut Vec<Expr>,
) -> Result<()> {
if frame.units == WindowFrameUnits::Range && order_by.len() != 1 {
// Normally, RANGE frames require an ORDER BY clause with exactly one
// column. However, an ORDER BY clause may be absent or present but with
// more than one column in two edge cases:
// 1. start bound is UNBOUNDED or CURRENT ROW
// 2. end bound is CURRENT ROW or UNBOUNDED.
// In these cases, we regularize the RANGE frame to be equivalent to a ROWS
// frame with the UNBOUNDED bounds.
// Note that this follows Postgres behavior.
// In these cases, we regularize the ORDER BY clause if the ORDER BY clause
// is absent. If an ORDER BY clause is present but has more than one column,
// the ORDER BY clause is unchanged. Note that this follows Postgres behavior.
if (frame.start_bound.is_unbounded()
|| frame.start_bound == WindowFrameBound::CurrentRow)
&& (frame.end_bound == WindowFrameBound::CurrentRow
|| frame.end_bound.is_unbounded())
Copy link
Member Author

@viirya viirya Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RANGE frame with UNBOUNDED or CURRENT ROW PRECEDING/FOLLOWING without ORDER BY, all rows of the partitions become peers of the current row, i.e., the sort expression returns the same for all rows.

{
// If an ORDER BY clause is absent, the frame is equivalent to a ROWS
// frame with the UNBOUNDED bounds.
// If an ORDER BY clause is present but has more than one column, the
// frame is unchanged.
if order_bys == 0 {
frame.units = WindowFrameUnits::Rows;
frame.start_bound =
WindowFrameBound::Preceding(ScalarValue::UInt64(None));
frame.end_bound = WindowFrameBound::Following(ScalarValue::UInt64(None));
// If an ORDER BY clause is absent, it is equivalent to a ORDER BY clause
// with constant value as sort key.
// If an ORDER BY clause is present but has more than one column, it is
// unchanged.
if order_by.is_empty() {
order_by.push(Expr::Sort(Sort::new(
Box::new(Expr::Literal(ScalarValue::UInt64(Some(1)))),
true,
false,
)));
}
} else {
}
}
Ok(())
}

/// Checks if given window frame is valid. In particular, if the frame is RANGE
/// with offset PRECEDING/FOLLOWING, it must have exactly one ORDER BY column.
pub fn check_window_frame(frame: &WindowFrame, order_bys: usize) -> Result<()> {
if frame.units == WindowFrameUnits::Range && order_bys != 1 {
// See `regularize_window_order_by`.
if !(frame.start_bound.is_unbounded()
|| frame.start_bound == WindowFrameBound::CurrentRow)
|| !(frame.end_bound == WindowFrameBound::CurrentRow
|| frame.end_bound.is_unbounded())
{
plan_err!("RANGE requires exactly one ORDER BY column")?
}
} else if frame.units == WindowFrameUnits::Groups && order_bys == 0 {
plan_err!("GROUPS requires an ORDER BY clause")?
};
Ok(frame)
Ok(())
}

/// There are five ways to describe starting and ending frame boundaries:
Expand Down
8 changes: 5 additions & 3 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion_common::{
internal_err, plan_datafusion_err, Column, Constraint, Constraints, DFField,
DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue,
};
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims, array_element,
array_except, array_has, array_has_all, array_has_any, array_intersect, array_length,
Expand All @@ -58,7 +59,6 @@ use datafusion_expr::{
sqrt, starts_with, string_to_array, strpos, struct_fun, substr, substr_index,
substring, tan, tanh, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_nanos, to_timestamp_seconds, translate, trim, trunc, upper, uuid,
window_frame::regularize,
AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction,
Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
GroupingSet::GroupingSets,
Expand Down Expand Up @@ -1070,7 +1070,7 @@ pub fn parse_expr(
.iter()
.map(|e| parse_expr(e, registry))
.collect::<Result<Vec<_>, _>>()?;
let order_by = expr
let mut order_by = expr
.order_by
.iter()
.map(|e| parse_expr(e, registry))
Expand All @@ -1080,14 +1080,16 @@ pub fn parse_expr(
.as_ref()
.map::<Result<WindowFrame, _>, _>(|window_frame| {
let window_frame = window_frame.clone().try_into()?;
regularize(window_frame, order_by.len())
check_window_frame(&window_frame, order_by.len())
.map(|_| window_frame)
})
.transpose()?
.ok_or_else(|| {
DataFusionError::Execution(
"missing window frame during deserialization".to_string(),
)
})?;
regularize_window_order_by(&window_frame, &mut order_by)?;

match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
Expand Down
10 changes: 7 additions & 3 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion_common::{
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::function::suggest_valid_function;
use datafusion_expr::window_frame::regularize;
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
use datafusion_expr::{
expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
WindowFunction,
Expand Down Expand Up @@ -92,7 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let order_by = self.order_by_to_sort_expr(
let mut order_by = self.order_by_to_sort_expr(
&window.order_by,
schema,
planner_context,
Expand All @@ -104,14 +104,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.as_ref()
.map(|window_frame| {
let window_frame = window_frame.clone().try_into()?;
regularize(window_frame, order_by.len())
check_window_frame(&window_frame, order_by.len())
.map(|_| window_frame)
})
.transpose()?;

let window_frame = if let Some(window_frame) = window_frame {
regularize_window_order_by(&window_frame, &mut order_by)?;
window_frame
} else {
WindowFrame::new(!order_by.is_empty())
};

if let Ok(fun) = self.find_window_func(&name) {
let expr = match fun {
WindowFunction::AggregateFunction(aggregate_fun) => {
Expand Down
16 changes: 7 additions & 9 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3763,15 +3763,13 @@ select a,
1 1
2 2

# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
# Comment it because it is flaky now as it depends on the order of the `a` column.
# query II
# select a,
# rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
# from (select 1 a union select 2 a) q ORDER BY rnk
# ----
# 1 1
# 2 2
query II
select a,
rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q ORDER BY a
----
1 1
2 1

# TODO: this works in Postgres which returns [1, 1].
query error DataFusion error: Arrow error: Invalid argument error: must either specify a row count or at least one column
Expand Down