From d2fe8a4195d8386bcf73d26dbbdaa9f995ba794d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Jan 2025 15:09:07 -0500 Subject: [PATCH] Introduce LexRequirement::collapse --- .../physical-expr-common/src/sort_expr.rs | 19 +++++++++++++++++++ .../physical-expr/src/equivalence/class.rs | 7 ++++--- .../physical-expr/src/equivalence/mod.rs | 15 ++++++--------- .../src/equivalence/properties.rs | 18 +++++++----------- .../physical-plan/src/aggregates/mod.rs | 9 ++++----- datafusion/physical-plan/src/windows/mod.rs | 5 ++--- 6 files changed, 42 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 35c9b63a536e..051c3be2f0b2 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -556,6 +556,25 @@ impl LexRequirement { .collect(), ) } + + /// Constructs a duplicate-free `LexOrderingReq` by filtering out + /// duplicate entries that have same physical expression inside. + /// + /// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a + /// Some(ASC)]`. + /// + /// It will also filter out entries that are ordered if the next entry is; + /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to + /// `vec![a Some(ASC)]`. + pub fn collapse(self) -> Self { + let mut output = Vec::::new(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + LexRequirement::new(output) + } } impl From for LexRequirement { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9e535a94eb6e..495cf211efe7 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; +use super::{add_offset_to_expr, ProjectionMapping}; use crate::{ expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -527,12 +527,13 @@ impl EquivalenceGroup { &self, sort_reqs: &LexRequirement, ) -> LexRequirement { - collapse_lex_req(LexRequirement::new( + LexRequirement::new( sort_reqs .iter() .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) .collect(), - )) + ) + .collapse() } /// Projects `expr` according to the given projection mapping. diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index d4c14f7bc8ff..60e508dd937a 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::expressions::Column; -use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; +use crate::{LexRequirement, PhysicalExpr}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -41,14 +41,9 @@ pub use properties::{ /// It will also filter out entries that are ordered if the next entry is; /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to /// `vec![a Some(ASC)]`. +#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")] pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { - let mut output = Vec::::new(); - for item in input { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); - } - } - LexRequirement::new(output) + input.collapse() } /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -80,7 +75,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::{plan_datafusion_err, Result}; - use datafusion_physical_expr_common::sort_expr::LexOrdering; + use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, PhysicalSortRequirement, + }; pub fn output_schema( mapping: &ProjectionMapping, diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index b80dd56ff30b..a256b4576e05 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -24,8 +24,7 @@ use std::{fmt, mem}; use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ - collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, - ProjectionMapping, + EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ @@ -500,15 +499,12 @@ impl EquivalenceProperties { ); let constants_normalized = self.eq_group.normalize_exprs(constant_exprs); // Prune redundant sections in the requirement: - collapse_lex_req( - normalized_sort_reqs - .iter() - .filter(|&order| { - !physical_exprs_contains(&constants_normalized, &order.expr) - }) - .cloned() - .collect(), - ) + normalized_sort_reqs + .iter() + .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr)) + .cloned() + .collect::() + .collapse() } /// Checks whether the given ordering is satisfied by any of the existing diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c04211d679ca..ef98be691c99 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::Column, - physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, - PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains, + EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -473,7 +472,7 @@ impl AggregateExec { &mode, )?; new_requirement.inner.extend(req); - new_requirement = collapse_lex_req(new_requirement); + new_requirement = new_requirement.collapse(); // If our aggregation has grouping sets then our base grouping exprs will // be expanded based on the flags in `group_by.groups` where for each diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 36c4b9f18da9..510cbc248b63 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -32,7 +32,6 @@ use datafusion_expr::{ PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr}, @@ -469,8 +468,8 @@ pub fn get_window_mode( { let req = LexRequirement::new( [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(), - ); - let req = collapse_lex_req(req); + ) + .collapse(); if partition_by_eqs.ordering_satisfy_requirement(&req) { // Window can be run with existing ordering let mode = if indices.len() == partitionby_exprs.len() {