From 544e05838f93afaf6cfdb0ec6f91e086f4c437b0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Jan 2025 14:35:30 -0500 Subject: [PATCH] WIP: Use IndexSet for OrderingEquivalenceSet --- .../physical-expr-common/src/sort_expr.rs | 8 +++ .../physical-expr/src/equivalence/mod.rs | 2 +- .../physical-expr/src/equivalence/ordering.rs | 58 +++++++++++++------ .../src/equivalence/properties.rs | 29 +++++----- 4 files changed, 64 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 63397e69c09d1..405177a1a430a 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -425,6 +425,14 @@ impl LexOrdering { } output } + + /// applies the method to each expr in this ordering + pub fn map(mut self, mut f: impl FnMut(&mut PhysicalSortExpr)) -> Self { + for sort_expr in self.inner.iter_mut() { + f(sort_expr) + } + self + } } impl From> for LexOrdering { diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index a5b85064e6252..97a7959f80f97 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -48,7 +48,7 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { /// Adds the `offset` value to `Column` indices inside `expr`. This function is /// generally used during the update of the right table schema in join operations. -pub fn add_offset_to_expr( +fn add_offset_to_expr( expr: Arc, offset: usize, ) -> Arc { diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index ae502d4d5f670..4de8cb470dcef 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -16,13 +16,13 @@ // under the License. use std::fmt::Display; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; use std::sync::Arc; -use std::vec::IntoIter; use crate::equivalence::add_offset_to_expr; use crate::{LexOrdering, PhysicalExpr}; use arrow_schema::SortOptions; +use indexmap::IndexSet; /// An `OrderingEquivalenceClass` object keeps track of different alternative /// orderings than can describe a schema. For example, consider the following table: @@ -37,9 +37,18 @@ use arrow_schema::SortOptions; /// /// Here, both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the table /// ordering. In this case, we say that these orderings are equivalent. -#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)] +#[derive(Debug, Clone, Eq, PartialEq, Default)] pub struct OrderingEquivalenceClass { - orderings: Vec, + /// Use index set to maintain order but avoid duplicates. + orderings: IndexSet, +} + +impl Hash for OrderingEquivalenceClass { + fn hash(&self, state: &mut H) { + for ordering in &self.orderings { + ordering.hash(state); + } + } } impl OrderingEquivalenceClass { @@ -56,15 +65,16 @@ impl OrderingEquivalenceClass { /// Creates new ordering equivalence class from the given orderings /// /// Any redundant entries are removed - pub fn new(orderings: Vec) -> Self { - let mut result = Self { orderings }; - result.remove_redundant_entries(); - result + pub fn new(orderings: impl IntoIterator) -> Self { + let orderings = orderings.into_iter().collect(); + Self { orderings } } /// Converts this OrderingEquivalenceClass to a vector of orderings. + /// + // TODO remove / rename into_vec if it is needed pub fn into_inner(self) -> Vec { - self.orderings + self.orderings.into_iter().collect() } /// Checks whether `ordering` is a member of this equivalence class. @@ -175,32 +185,38 @@ impl OrderingEquivalenceClass { let n_ordering = self.orderings.len(); // Replicate entries before cross product let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering); - self.orderings = self + let mut new_orderings: Vec<_> = self .orderings .iter() .cloned() .cycle() .take(n_cross) .collect(); + // Suffix orderings of other to the current orderings. for (outer_idx, ordering) in other.iter().enumerate() { for idx in 0..n_ordering { // Calculate cross product index let idx = outer_idx * n_ordering + idx; - self.orderings[idx].inner.extend(ordering.iter().cloned()); + new_orderings[idx].inner.extend(ordering.iter().cloned()); } } + // turn back to indexset + self.orderings = new_orderings.into_iter().collect(); self } /// Adds `offset` value to the index of each expression inside this /// ordering equivalence class. - pub fn add_offset(&mut self, offset: usize) { - for ordering in self.orderings.iter_mut() { - for sort_expr in ordering.inner.iter_mut() { - sort_expr.expr = add_offset_to_expr(Arc::clone(&sort_expr.expr), offset); - } - } + pub fn add_offset(mut self, offset: usize) -> Self { + self.orderings = + // update each offset and then recollect the set + self.orderings.into_iter() + .map(|ordering| { + ordering.map(|sort_expr| { sort_expr.expr = add_offset_to_expr(Arc::clone(&sort_expr.expr), offset); }) + }) + .collect(); + self } /// Gets sort options associated with this expression if it is a leading @@ -219,7 +235,7 @@ impl OrderingEquivalenceClass { /// Convert the `OrderingEquivalenceClass` into an iterator of LexOrderings impl IntoIterator for OrderingEquivalenceClass { type Item = LexOrdering; - type IntoIter = IntoIter; + type IntoIter = indexmap::set::IntoIter; fn into_iter(self) -> Self::IntoIter { self.orderings.into_iter() @@ -255,6 +271,12 @@ impl Display for OrderingEquivalenceClass { } } +impl From> for OrderingEquivalenceClass { + fn from(orderings: IndexSet) -> Self { + Self { orderings } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 2c7335649b28c..3b80b0869be60 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -199,8 +199,7 @@ impl EquivalenceProperties { OrderingEquivalenceClass::new( self.oeq_class .iter() - .map(|ordering| self.normalize_sort_exprs(ordering)) - .collect(), + .map(|ordering| self.normalize_sort_exprs(ordering)), ) } @@ -715,7 +714,7 @@ impl EquivalenceProperties { .iter() .map(|order| self.substitute_ordering_component(mapping, order)) .collect::>>()?; - let new_order = new_order.into_iter().flatten().collect(); + let new_order = new_order.into_iter().flatten(); self.oeq_class = OrderingEquivalenceClass::new(new_order); Ok(()) } @@ -1852,7 +1851,7 @@ pub fn join_equivalence_properties( } = left; let EquivalenceProperties { constants: right_constants, - oeq_class: mut right_oeq_class, + oeq_class: right_oeq_class, .. } = right; match maintains_input_order { @@ -1860,8 +1859,8 @@ pub fn join_equivalence_properties( // In this special case, right side ordering can be prefixed with // the left side ordering. if let (Some(JoinSide::Left), JoinType::Inner) = (probe_side, join_type) { - updated_right_ordering_equivalence_class( - &mut right_oeq_class, + let right_oeq_class = updated_right_ordering_equivalence_class( + right_oeq_class, join_type, left_size, ); @@ -1881,8 +1880,8 @@ pub fn join_equivalence_properties( } } [false, true] => { - updated_right_ordering_equivalence_class( - &mut right_oeq_class, + let right_oeq_class = updated_right_ordering_equivalence_class( + right_oeq_class, join_type, left_size, ); @@ -1927,15 +1926,17 @@ pub fn join_equivalence_properties( /// is the case for `Inner`, `Left`, `Full` and `Right` joins. For other cases, /// indices do not change. fn updated_right_ordering_equivalence_class( - right_oeq_class: &mut OrderingEquivalenceClass, + right_oeq_class: OrderingEquivalenceClass, join_type: &JoinType, left_size: usize, -) { +) -> OrderingEquivalenceClass { if matches!( join_type, JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right ) { - right_oeq_class.add_offset(left_size); + right_oeq_class.add_offset(left_size) + } else { + right_oeq_class } } @@ -2496,7 +2497,7 @@ mod tests { ]; let orderings = convert_to_orderings(&orderings); // Right child ordering equivalences - let mut right_oeq_class = OrderingEquivalenceClass::new(orderings); + let right_oeq_class = OrderingEquivalenceClass::new(orderings); let left_columns_len = 4; @@ -2519,8 +2520,8 @@ mod tests { join_eq_properties.add_equal_conditions(col_a, col_x)?; join_eq_properties.add_equal_conditions(col_d, col_w)?; - updated_right_ordering_equivalence_class( - &mut right_oeq_class, + let right_oeq_class = updated_right_ordering_equivalence_class( + right_oeq_class, &join_type, left_columns_len, );