Skip to content

Commit

Permalink
Introduce LexRequirement::collapse
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 7, 2025
1 parent 1d2c339 commit d2fe8a4
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 31 deletions.
19 changes: 19 additions & 0 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,25 @@ impl LexRequirement {
.collect(),
)
}

/// Constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside.
///
/// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a
/// Some(ASC)]`.
///
/// It will also filter out entries that are ordered if the next entry is;
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
/// `vec![a Some(ASC)]`.
pub fn collapse(self) -> Self {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
}
}

impl From<LexOrdering> for LexRequirement {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
use super::{add_offset_to_expr, ProjectionMapping};
use crate::{
expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
Expand Down Expand Up @@ -527,12 +527,13 @@ impl EquivalenceGroup {
&self,
sort_reqs: &LexRequirement,
) -> LexRequirement {
collapse_lex_req(LexRequirement::new(
LexRequirement::new(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
))
)
.collapse()
}

/// Projects `expr` according to the given projection mapping.
Expand Down
15 changes: 6 additions & 9 deletions datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use crate::expressions::Column;
use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement};
use crate::{LexRequirement, PhysicalExpr};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};

Expand All @@ -41,14 +41,9 @@ pub use properties::{
/// It will also filter out entries that are ordered if the next entry is;
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
/// `vec![a Some(ASC)]`.
#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")]
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
input.collapse()
}

/// Adds the `offset` value to `Column` indices inside `expr`. This function is
Expand Down Expand Up @@ -80,7 +75,9 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, PhysicalSortRequirement,
};

pub fn output_schema(
mapping: &ProjectionMapping,
Expand Down
18 changes: 7 additions & 11 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use std::{fmt, mem};

use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
use crate::equivalence::{
collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
use crate::{
Expand Down Expand Up @@ -500,15 +499,12 @@ impl EquivalenceProperties {
);
let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
// Prune redundant sections in the requirement:
collapse_lex_req(
normalized_sort_reqs
.iter()
.filter(|&order| {
!physical_exprs_contains(&constants_normalized, &order.expr)
})
.cloned()
.collect(),
)
normalized_sort_reqs
.iter()
.filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
.cloned()
.collect::<LexRequirement>()
.collapse()
}

/// Checks whether the given ordering is satisfied by any of the existing
Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext;
use datafusion_expr::{Accumulator, Aggregate};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::Column,
physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement,
PhysicalExpr, PhysicalSortRequirement,
equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains,
EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr,
PhysicalSortRequirement,
};

use itertools::Itertools;
Expand Down Expand Up @@ -473,7 +472,7 @@ impl AggregateExec {
&mode,
)?;
new_requirement.inner.extend(req);
new_requirement = collapse_lex_req(new_requirement);
new_requirement = new_requirement.collapse();

// If our aggregation has grouping sets then our base grouping exprs will
// be expanded based on the flags in `group_by.groups` where for each
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_expr::{
PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
Expand Down Expand Up @@ -469,8 +468,8 @@ pub fn get_window_mode(
{
let req = LexRequirement::new(
[partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
);
let req = collapse_lex_req(req);
)
.collapse();
if partition_by_eqs.ordering_satisfy_requirement(&req) {
// Window can be run with existing ordering
let mode = if indices.len() == partitionby_exprs.len() {
Expand Down

0 comments on commit d2fe8a4

Please sign in to comment.