Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ordering analysis with expressions (not just columns) by Replace OrderedColumn with PhysicalSortExpr #6501

Merged
merged 13 commits into from
Jun 5, 2023
Merged
14 changes: 10 additions & 4 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,8 @@ mod tests {
lit, ApproxDistinct, Column, Count, Median,
};
use datafusion_physical_expr::{
AggregateExpr, EquivalenceProperties, OrderedColumn,
OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr,
};
use futures::{FutureExt, Stream};
use std::any::Any;
Expand Down Expand Up @@ -1726,8 +1726,14 @@ mod tests {
eq_properties.add_equal_conditions((&col_a, &col_b));
let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
ordering_eq_properties.add_equal_conditions((
&vec![OrderedColumn::new(col_a.clone(), options1)],
&vec![OrderedColumn::new(col_c.clone(), options2)],
&vec![PhysicalSortExpr {
expr: Arc::new(col_a.clone()) as _,
options: options1,
}],
&vec![PhysicalSortExpr {
expr: Arc::new(col_c.clone()) as _,
options: options2,
}],
));

let order_by_exprs = vec![
Expand Down
20 changes: 2 additions & 18 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,26 +588,10 @@ pub fn ordering_equivalence_properties_helper(
// Return an empty OrderingEquivalenceProperties:
return oep;
};
let first_column = first_ordering
.iter()
.map(|e| TryFrom::try_from(e.clone()))
.collect::<Result<Vec<_>>>();
let checked_column_first = if let Ok(first) = first_column {
first
} else {
// Return an empty OrderingEquivalenceProperties:
return oep;
};
// First entry among eq_orderings is the head, skip it:
for ordering in eq_orderings.iter().skip(1) {
let column = ordering
.iter()
.map(|e| TryFrom::try_from(e.clone()))
.collect::<Result<Vec<_>>>();
if let Ok(column) = column {
if !column.is_empty() {
oep.add_equal_conditions((&checked_column_first, &column))
}
if !ordering.is_empty() {
oep.add_equal_conditions((first_ordering, ordering))
}
}
oep
Expand Down
11 changes: 6 additions & 5 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_e
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
use datafusion_physical_expr::{
OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement};
pub use window_agg_exec::WindowAggExec;

/// Create a physical expression for window function
Expand Down Expand Up @@ -270,14 +268,17 @@ pub(crate) fn window_ordering_equivalence(
.is::<RowNumber>()
{
if let Some((idx, field)) =
schema.column_with_name(expr.field().unwrap().name())
schema.column_with_name(builtin_window_expr.name())
{
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = OrderedColumn::new(column, options);
let rhs = PhysicalSortExpr {
expr: Arc::new(column) as _,
options,
};
builder.add_equal_conditions(vec![rhs]);
}
}
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,30 @@ GlobalLimitExec: skip=0, fetch=5
------SortExec: expr=[c9@0 DESC]
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true

# This test shows that ordering equivalence can keep track of complex expressions (not just Column expressions)
# during ordering satisfy analysis. In the final plan we should only see single SortExec.
query TT
EXPLAIN SELECT c5, c9, rn1 FROM (SELECT c5, c9,
ROW_NUMBER() OVER(ORDER BY c9 + c5 DESC) as rn1
FROM aggregate_test_100
ORDER BY c9 + c5 DESC)
ORDER BY rn1, c9 + c5 DESC
LIMIT 5
----
logical_plan
Limit: skip=0, fetch=5
--Sort: rn1 ASC NULLS LAST, CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST, fetch=5
----Sort: CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST
------Projection: aggregate_test_100.c5, aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
----------TableScan: aggregate_test_100 projection=[c5, c9]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--ProjectionExec: expr=[c5@0 as c5, c9@1 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rn1]
----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true

# The following query has type error. We should test the error could be detected
# from either the logical plan (when `skip_failed_rules` is set to `false`) or
# the physical plan (when `skip_failed_rules` is set to `true`).
Expand Down
Loading