From c72b98e41489c09d02cdb5e335c547cfdc5319c4 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Mon, 18 Sep 2023 13:23:41 +0300 Subject: [PATCH] Enhance/Refactor Ordering Equivalence Properties (#7566) * separate implementation of oeq properties * Simplifications * Move utils to methods * Remove unnecesary code * Address todo * Buggy is_aggressive mod eklenecek * start implementing aggressive mode * all tests pass * minor changes * All tests pass * Minor changes * All tests pass * minor changes * all tests pass * Simplifications * minor changes * Resolve linter error * Minor changes * minor changes * Update plan * Simplifications, update comments * Update comments, Use existing stats to find constants * Simplifications * Unknown input stats are handled * Address reviews * Simplifications * Simplifications * Address reviews * Fix subdirectories --------- Co-authored-by: berkaysynnada --- datafusion/common/src/stats.rs | 24 + .../enforce_distribution.rs | 24 +- datafusion/physical-expr/src/analysis.rs | 5 +- datafusion/physical-expr/src/equivalence.rs | 604 ++++++++++++++++-- datafusion/physical-expr/src/lib.rs | 10 +- datafusion/physical-expr/src/partitioning.rs | 19 +- datafusion/physical-expr/src/utils.rs | 446 ++++--------- datafusion/physical-plan/src/filter.rs | 26 +- datafusion/physical-plan/src/joins/utils.rs | 228 +++---- datafusion/physical-plan/src/memory.rs | 6 +- datafusion/physical-plan/src/projection.rs | 6 +- datafusion/sqllogictest/test_files/select.slt | 106 +++ .../sqllogictest/test_files/subquery.slt | 25 +- datafusion/sqllogictest/test_files/window.slt | 9 +- 14 files changed, 952 insertions(+), 586 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index db788efef7cd..ca76e14cb8ab 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -19,6 +19,8 @@ use std::fmt::Display; +use arrow::datatypes::DataType; + use crate::ScalarValue; /// Statistics for a relation @@ -70,3 +72,25 @@ pub struct ColumnStatistics { /// Number of distinct values pub distinct_count: Option, } + +impl ColumnStatistics { + /// Column contains a single non null value (e.g constant). + pub fn is_singleton(&self) -> bool { + match (&self.min_value, &self.max_value) { + // Min and max values are the same and not infinity. + (Some(min), Some(max)) => !min.is_null() && !max.is_null() && (min == max), + (_, _) => false, + } + } + + /// Returns the [`ColumnStatistics`] corresponding to the given datatype by assigning infinite bounds. + pub fn new_with_unbounded_column(dt: &DataType) -> ColumnStatistics { + let null = ScalarValue::try_from(dt.clone()).ok(); + ColumnStatistics { + null_count: None, + max_value: null.clone(), + min_value: null, + distinct_count: None, + } + } +} diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 77d6e7d7123d..565f76affa9f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -54,8 +54,7 @@ use datafusion_physical_expr::utils::{ map_columns_before_projection, ordering_satisfy_requirement_concrete, }; use datafusion_physical_expr::{ - expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, PhysicalExpr, - PhysicalSortRequirement, + expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement, }; use datafusion_common::internal_err; @@ -807,36 +806,21 @@ fn try_reorder( } else if !equivalence_properties.classes().is_empty() { normalized_expected = expected .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - equivalence_properties.classes(), - ) - }) + .map(|e| equivalence_properties.normalize_expr(e.clone())) .collect::>(); assert_eq!(normalized_expected.len(), expected.len()); normalized_left_keys = join_keys .left_keys .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - equivalence_properties.classes(), - ) - }) + .map(|e| equivalence_properties.normalize_expr(e.clone())) .collect::>(); assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len()); normalized_right_keys = join_keys .right_keys .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - equivalence_properties.classes(), - ) - }) + .map(|e| equivalence_properties.normalize_expr(e.clone())) .collect::>(); assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len()); diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index d3fcdc11ad52..990c643c6b08 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -189,12 +189,15 @@ fn shrink_boundaries( })?; let final_result = graph.get_interval(*root_index); + // If during selectivity calculation we encounter an error, use 1.0 as cardinality estimate + // safest estimate(e.q largest possible value). let selectivity = calculate_selectivity( &final_result.lower.value, &final_result.upper.value, &target_boundaries, &initial_boundaries, - )?; + ) + .unwrap_or(1.0); if !(0.0..=1.0).contains(&selectivity) { return internal_err!("Selectivity is out of limit: {}", selectivity); diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index b8ca1acc1cf4..369c139aa30b 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,34 +15,37 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::Column; -use crate::utils::collect_columns; +use crate::expressions::{CastExpr, Column}; +use crate::utils::{collect_columns, merge_vectors}; use crate::{ - normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr, - PhysicalSortExpr, + LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalExpr, PhysicalSortExpr, + PhysicalSortRequirement, }; use arrow::datatypes::SchemaRef; use arrow_schema::Fields; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::Result; +use itertools::izip; use std::collections::{HashMap, HashSet}; use std::hash::Hash; +use std::ops::Range; use std::sync::Arc; /// Represents a collection of [`EquivalentClass`] (equivalences /// between columns in relations) /// -/// This is used to represent both: +/// This is used to represent: /// /// 1. Equality conditions (like `A=B`), when `T` = [`Column`] -/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`PhysicalSortExpr`] #[derive(Debug, Clone)] -pub struct EquivalenceProperties { - classes: Vec>, +pub struct EquivalenceProperties { + classes: Vec>, schema: SchemaRef, } -impl EquivalenceProperties { +impl EquivalenceProperties { pub fn new(schema: SchemaRef) -> Self { EquivalenceProperties { classes: vec![], @@ -51,7 +54,7 @@ impl EquivalenceProperties { } /// return the set of equivalences - pub fn classes(&self) -> &[EquivalentClass] { + pub fn classes(&self) -> &[EquivalentClass] { &self.classes } @@ -60,7 +63,7 @@ impl EquivalenceProperties { } /// Add the [`EquivalentClass`] from `iter` to this list - pub fn extend>>(&mut self, iter: I) { + pub fn extend>>(&mut self, iter: I) { for ec in iter { self.classes.push(ec) } @@ -68,7 +71,7 @@ impl EquivalenceProperties { /// Adds new equal conditions into the EquivalenceProperties. New equal /// conditions usually come from equality predicates in a join/filter. - pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) { + pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) { let mut idx1: Option = None; let mut idx2: Option = None; for (idx, class) in self.classes.iter_mut().enumerate() { @@ -106,7 +109,7 @@ impl EquivalenceProperties { } (None, None) => { // adding new pairs - self.classes.push(EquivalentClass::::new( + self.classes.push(EquivalentClass::::new( new_conditions.0.clone(), vec![new_conditions.1.clone()], )); @@ -114,6 +117,81 @@ impl EquivalenceProperties { _ => {} } } + + /// Normalizes physical expression according to `EquivalentClass`es inside `self.classes`. + /// expression is replaced with `EquivalentClass::head` expression if it is among `EquivalentClass::others`. + pub fn normalize_expr(&self, expr: Arc) -> Arc { + expr.clone() + .transform(&|expr| { + let normalized_form = + expr.as_any().downcast_ref::().and_then(|column| { + for class in &self.classes { + if class.contains(column) { + return Some(Arc::new(class.head().clone()) as _); + } + } + None + }); + Ok(if let Some(normalized_form) = normalized_form { + Transformed::Yes(normalized_form) + } else { + Transformed::No(expr) + }) + }) + .unwrap_or(expr) + } + + /// This function applies the \[`normalize_expr`] + /// function for all expression in `exprs` and returns a vector of + /// normalized physical expressions. + pub fn normalize_exprs( + &self, + exprs: &[Arc], + ) -> Vec> { + exprs + .iter() + .map(|expr| self.normalize_expr(expr.clone())) + .collect::>() + } + + /// This function normalizes `sort_requirement` according to `EquivalenceClasses` in the `self`. + /// If the given sort requirement doesn't belong to equivalence set inside + /// `self`, it returns `sort_requirement` as is. + pub fn normalize_sort_requirement( + &self, + mut sort_requirement: PhysicalSortRequirement, + ) -> PhysicalSortRequirement { + sort_requirement.expr = self.normalize_expr(sort_requirement.expr); + sort_requirement + } + + /// This function applies the \[`normalize_sort_requirement`] + /// function for all sort requirements in `sort_reqs` and returns a vector of + /// normalized sort expressions. + pub fn normalize_sort_requirements( + &self, + sort_reqs: &[PhysicalSortRequirement], + ) -> Vec { + let normalized_sort_reqs = sort_reqs + .iter() + .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) + .collect::>(); + collapse_vec(normalized_sort_reqs) + } + + /// Similar to the \[`normalize_sort_requirements`] this function normalizes + /// sort expressions in `sort_exprs` and returns a vector of + /// normalized sort expressions. + pub fn normalize_sort_exprs( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Vec { + let sort_requirements = + PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); + let normalized_sort_requirement = + self.normalize_sort_requirements(&sort_requirements); + PhysicalSortRequirement::to_sort_exprs(normalized_sort_requirement) + } } /// `OrderingEquivalenceProperties` keeps track of columns that describe the @@ -131,17 +209,120 @@ impl EquivalenceProperties { /// where both `a ASC` and `b DESC` can describe the table ordering. With /// `OrderingEquivalenceProperties`, we can keep track of these equivalences /// and treat `a ASC` and `b DESC` as the same ordering requirement. -pub type OrderingEquivalenceProperties = EquivalenceProperties; +#[derive(Debug, Clone)] +pub struct OrderingEquivalenceProperties { + oeq_class: Option, + /// Keeps track of expressions that have constant value. + constants: Vec>, + schema: SchemaRef, +} impl OrderingEquivalenceProperties { + /// Create an empty `OrderingEquivalenceProperties` + pub fn new(schema: SchemaRef) -> Self { + Self { + oeq_class: None, + constants: vec![], + schema, + } + } + + /// Extends `OrderingEquivalenceProperties` by adding ordering inside the `other` + /// to the `self.oeq_class`. + pub fn extend(&mut self, other: Option) { + if let Some(other) = other { + if let Some(class) = &mut self.oeq_class { + class.others.insert(other.head); + class.others.extend(other.others); + } else { + self.oeq_class = Some(other); + } + } + } + + pub fn oeq_class(&self) -> Option<&OrderingEquivalentClass> { + self.oeq_class.as_ref() + } + + /// Adds new equal conditions into the EquivalenceProperties. New equal + /// conditions usually come from equality predicates in a join/filter. + pub fn add_equal_conditions(&mut self, new_conditions: (&LexOrdering, &LexOrdering)) { + if let Some(class) = &mut self.oeq_class { + class.insert(new_conditions.0.clone()); + class.insert(new_conditions.1.clone()); + } else { + let head = new_conditions.0.clone(); + let others = vec![new_conditions.1.clone()]; + self.oeq_class = Some(OrderingEquivalentClass::new(head, others)) + } + } + + /// Add physical expression that have constant value to the `self.constants` + pub fn with_constants(mut self, constants: Vec>) -> Self { + constants.into_iter().for_each(|constant| { + if !physical_exprs_contains(&self.constants, &constant) { + self.constants.push(constant); + } + }); + self + } + + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// This function normalizes `sort_reqs` by + /// - removing expressions that have constant value from requirement + /// - replacing sections that are in the `self.oeq_class.others` with `self.oeq_class.head` + /// - removing sections that satisfies global ordering that are in the post fix of requirement + pub fn normalize_sort_requirements( + &self, + sort_reqs: &[PhysicalSortRequirement], + ) -> Vec { + let normalized_sort_reqs = + prune_sort_reqs_with_constants(sort_reqs, &self.constants); + let mut normalized_sort_reqs = collapse_lex_req(normalized_sort_reqs); + if let Some(oeq_class) = &self.oeq_class { + for item in oeq_class.others() { + let item = PhysicalSortRequirement::from_sort_exprs(item); + let item = prune_sort_reqs_with_constants(&item, &self.constants); + let ranges = get_compatible_ranges(&normalized_sort_reqs, &item); + let mut offset: i64 = 0; + for Range { start, end } in ranges { + let head = PhysicalSortRequirement::from_sort_exprs(oeq_class.head()); + let mut head = prune_sort_reqs_with_constants(&head, &self.constants); + let updated_start = (start as i64 + offset) as usize; + let updated_end = (end as i64 + offset) as usize; + let range = end - start; + offset += head.len() as i64 - range as i64; + let all_none = normalized_sort_reqs[updated_start..updated_end] + .iter() + .all(|req| req.options.is_none()); + if all_none { + for req in head.iter_mut() { + req.options = None; + } + } + normalized_sort_reqs.splice(updated_start..updated_end, head); + } + } + normalized_sort_reqs = simplify_lex_req(normalized_sort_reqs, oeq_class); + } + collapse_lex_req(normalized_sort_reqs) + } + /// Checks whether `leading_ordering` is contained in any of the ordering /// equivalence classes. pub fn satisfies_leading_ordering( &self, leading_ordering: &PhysicalSortExpr, ) -> bool { - for cls in &self.classes { - for ordering in cls.others.iter().chain(std::iter::once(&cls.head)) { + if let Some(oeq_class) = &self.oeq_class { + for ordering in oeq_class + .others + .iter() + .chain(std::iter::once(&oeq_class.head)) + { if ordering[0].eq(leading_ordering) { return true; } @@ -280,6 +461,55 @@ impl OrderingEquivalentClass { self.insert(update_with_alias(ordering, oeq_alias_map)); } } + + /// Adds `offset` value to the index of each expression inside `self.head` and `self.others`. + pub fn add_offset(&self, offset: usize) -> Result { + let head = add_offset_to_lex_ordering(self.head(), offset)?; + let others = self + .others() + .iter() + .map(|ordering| add_offset_to_lex_ordering(ordering, offset)) + .collect::>>()?; + Ok(OrderingEquivalentClass::new(head, others)) + } + + /// This function normalizes `OrderingEquivalenceProperties` according to `eq_properties`. + /// More explicitly, it makes sure that expressions in `oeq_class` are head entries + /// in `eq_properties`, replacing any non-head entries with head entries if necessary. + pub fn normalize_with_equivalence_properties( + &self, + eq_properties: &EquivalenceProperties, + ) -> OrderingEquivalentClass { + let head = eq_properties.normalize_sort_exprs(self.head()); + + let others = self + .others() + .iter() + .map(|other| eq_properties.normalize_sort_exprs(other)) + .collect(); + + EquivalentClass::new(head, others) + } + + /// Prefix with existing ordering. + pub fn prefix_ordering_equivalent_class_with_existing_ordering( + &self, + existing_ordering: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> OrderingEquivalentClass { + let existing_ordering = eq_properties.normalize_sort_exprs(existing_ordering); + let normalized_head = eq_properties.normalize_sort_exprs(self.head()); + let updated_head = merge_vectors(&existing_ordering, &normalized_head); + let updated_others = self + .others() + .iter() + .map(|ordering| { + let normalized_ordering = eq_properties.normalize_sort_exprs(ordering); + merge_vectors(&existing_ordering, &normalized_ordering) + }) + .collect(); + OrderingEquivalentClass::new(updated_head, updated_others) + } } /// This is a builder object facilitating incremental construction @@ -308,7 +538,7 @@ impl OrderingEquivalenceBuilder { new_ordering_eq_properties: OrderingEquivalenceProperties, ) -> Self { self.ordering_eq_properties - .extend(new_ordering_eq_properties.classes().iter().cloned()); + .extend(new_ordering_eq_properties.oeq_class().cloned()); self } @@ -334,10 +564,7 @@ impl OrderingEquivalenceBuilder { let mut normalized_out_ordering = vec![]; for item in &self.existing_ordering { // To account for ordering equivalences, first normalize the expression: - let normalized = normalize_expr_with_equivalence_properties( - item.expr.clone(), - self.eq_properties.classes(), - ); + let normalized = self.eq_properties.normalize_expr(item.expr.clone()); normalized_out_ordering.push(PhysicalSortExpr { expr: normalized, options: item.options, @@ -459,40 +686,77 @@ pub fn project_ordering_equivalence_properties( let schema = output_eq.schema(); let fields = schema.fields(); - let mut eq_classes = input_eq.classes().to_vec(); + let oeq_class = input_eq.oeq_class(); + let mut oeq_class = if let Some(oeq_class) = oeq_class { + oeq_class.clone() + } else { + return; + }; let mut oeq_alias_map = vec![]; for (column, columns) in columns_map { if is_column_invalid_in_new_schema(column, fields) { oeq_alias_map.push((column.clone(), columns[0].clone())); } } - for class in eq_classes.iter_mut() { - class.update_with_aliases(&oeq_alias_map, fields); - } + oeq_class.update_with_aliases(&oeq_alias_map, fields); - // Prune columns that are no longer in the schema from the OrderingEquivalenceProperties. - for class in eq_classes.iter_mut() { - let sort_exprs_to_remove = class - .iter() - .filter(|sort_exprs| { - sort_exprs.iter().any(|sort_expr| { - let cols_in_expr = collect_columns(&sort_expr.expr); - // If any one of the columns, used in Expression is invalid, remove expression - // from ordering equivalences - cols_in_expr - .iter() - .any(|col| is_column_invalid_in_new_schema(col, fields)) - }) + // Prune columns that no longer is in the schema from from the OrderingEquivalenceProperties. + let sort_exprs_to_remove = oeq_class + .iter() + .filter(|sort_exprs| { + sort_exprs.iter().any(|sort_expr| { + let cols_in_expr = collect_columns(&sort_expr.expr); + // If any one of the columns, used in Expression is invalid, remove expression + // from ordering equivalences + cols_in_expr + .iter() + .any(|col| is_column_invalid_in_new_schema(col, fields)) }) - .cloned() - .collect::>(); - for sort_exprs in sort_exprs_to_remove { - class.remove(&sort_exprs); + }) + .cloned() + .collect::>(); + for sort_exprs in sort_exprs_to_remove { + oeq_class.remove(&sort_exprs); + } + if oeq_class.len() > 1 { + output_eq.extend(Some(oeq_class)); + } +} + +/// Update `ordering` if it contains cast expression with target column +/// after projection, if there is no cast expression among `ordering` expressions, +/// returns `None`. +fn update_with_cast_exprs( + cast_exprs: &[(CastExpr, Column)], + mut ordering: LexOrdering, +) -> Option { + let mut is_changed = false; + for sort_expr in ordering.iter_mut() { + for (cast_expr, target_col) in cast_exprs.iter() { + if sort_expr.expr.eq(cast_expr.expr()) { + sort_expr.expr = Arc::new(target_col.clone()) as _; + is_changed = true; + } } } - eq_classes.retain(|props| props.len() > 1); + is_changed.then_some(ordering) +} - output_eq.extend(eq_classes); +/// Update cast expressions inside ordering equivalence +/// properties with its target column after projection +pub fn update_ordering_equivalence_with_cast( + cast_exprs: &[(CastExpr, Column)], + input_oeq: &mut OrderingEquivalenceProperties, +) { + if let Some(cls) = &mut input_oeq.oeq_class { + for ordering in + std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter()) + { + if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs, ordering) { + cls.insert(updated_ordering); + } + } + } } /// Retrieves the ordering equivalence properties for a given schema and output ordering. @@ -516,6 +780,197 @@ pub fn ordering_equivalence_properties_helper( oep } +/// This function constructs a duplicate-free vector by filtering out duplicate +/// entries inside the given vector `input`. +fn collapse_vec(input: Vec) -> Vec { + let mut output = vec![]; + for item in input { + if !output.contains(&item) { + output.push(item); + } + } + output +} + +/// This function constructs a duplicate-free `LexOrderingReq` by filtering out duplicate +/// entries that have same physical expression inside the given vector `input`. +/// `vec![a Some(Asc), a Some(Desc)]` is collapsed to the `vec![a Some(Asc)]`. Since +/// when same expression is already seen before, following expressions are redundant. +fn collapse_lex_req(input: LexOrderingReq) -> LexOrderingReq { + let mut output = vec![]; + for item in input { + if !lex_req_contains(&output, &item) { + output.push(item); + } + } + output +} + +/// Check whether `sort_req.expr` is among the expressions of `lex_req`. +fn lex_req_contains( + lex_req: &[PhysicalSortRequirement], + sort_req: &PhysicalSortRequirement, +) -> bool { + for constant in lex_req { + if constant.expr.eq(&sort_req.expr) { + return true; + } + } + false +} + +/// This function simplifies lexicographical ordering requirement +/// inside `input` by removing postfix lexicographical requirements +/// that satisfy global ordering (occurs inside the ordering equivalent class) +fn simplify_lex_req( + input: LexOrderingReq, + oeq_class: &OrderingEquivalentClass, +) -> LexOrderingReq { + let mut section = &input[..]; + loop { + let n_prune = prune_last_n_that_is_in_oeq(section, oeq_class); + // Cannot prune entries from the end of requirement + if n_prune == 0 { + break; + } + section = §ion[0..section.len() - n_prune]; + } + if section.is_empty() { + PhysicalSortRequirement::from_sort_exprs(oeq_class.head()) + } else { + section.to_vec() + } +} + +/// Determines how many entries from the end can be deleted. +/// Last n entry satisfies global ordering, hence having them +/// as postfix in the lexicographical requirement is unnecessary. +/// Assume requirement is [a ASC, b ASC, c ASC], also assume that +/// existing ordering is [c ASC, d ASC]. In this case, since [c ASC] +/// is satisfied by the existing ordering (e.g corresponding section is global ordering), +/// [c ASC] can be pruned from the requirement: [a ASC, b ASC, c ASC]. In this case, +/// this function will return 1, to indicate last element can be removed from the requirement +fn prune_last_n_that_is_in_oeq( + input: &[PhysicalSortRequirement], + oeq_class: &OrderingEquivalentClass, +) -> usize { + let input_len = input.len(); + for ordering in std::iter::once(oeq_class.head()).chain(oeq_class.others().iter()) { + let mut search_range = std::cmp::min(ordering.len(), input_len); + while search_range > 0 { + let req_section = &input[input_len - search_range..]; + // let given_section = &ordering[0..search_range]; + if req_satisfied(ordering, req_section) { + return search_range; + } else { + search_range -= 1; + } + } + } + 0 +} + +/// Checks whether given section satisfies req. +fn req_satisfied(given: LexOrderingRef, req: &[PhysicalSortRequirement]) -> bool { + for (given, req) in izip!(given.iter(), req.iter()) { + let PhysicalSortRequirement { expr, options } = req; + if let Some(options) = options { + if options != &given.options || !expr.eq(&given.expr) { + return false; + } + } else if !expr.eq(&given.expr) { + return false; + } + } + true +} + +/// This function searches for the slice `section` inside the slice `given`. +/// It returns each range where `section` is compatible with the corresponding +/// slice in `given`. +fn get_compatible_ranges( + given: &[PhysicalSortRequirement], + section: &[PhysicalSortRequirement], +) -> Vec> { + let n_section = section.len(); + let n_end = if given.len() >= n_section { + given.len() - n_section + 1 + } else { + 0 + }; + (0..n_end) + .filter_map(|idx| { + let end = idx + n_section; + given[idx..end] + .iter() + .zip(section) + .all(|(req, given)| given.compatible(req)) + .then_some(Range { start: idx, end }) + }) + .collect() +} + +/// It is similar to contains method of vector. +/// Finds whether `expr` is among `physical_exprs`. +pub fn physical_exprs_contains( + physical_exprs: &[Arc], + expr: &Arc, +) -> bool { + physical_exprs + .iter() + .any(|physical_expr| physical_expr.eq(expr)) +} + +/// Remove ordering requirements that have constant value +fn prune_sort_reqs_with_constants( + ordering: &[PhysicalSortRequirement], + constants: &[Arc], +) -> Vec { + ordering + .iter() + .filter(|&order| !physical_exprs_contains(constants, &order.expr)) + .cloned() + .collect() +} + +/// Adds the `offset` value to `Column` indices inside `expr`. This function is +/// generally used during the update of the right table schema in join operations. +pub(crate) fn add_offset_to_expr( + expr: Arc, + offset: usize, +) -> Result> { + expr.transform_down(&|e| match e.as_any().downcast_ref::() { + Some(col) => Ok(Transformed::Yes(Arc::new(Column::new( + col.name(), + offset + col.index(), + )))), + None => Ok(Transformed::No(e)), + }) +} + +/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`. +pub(crate) fn add_offset_to_sort_expr( + sort_expr: &PhysicalSortExpr, + offset: usize, +) -> Result { + Ok(PhysicalSortExpr { + expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?, + options: sort_expr.options, + }) +} + +/// Adds the `offset` value to `Column` indices for each `sort_expr.expr` +/// inside `sort_exprs`. +pub fn add_offset_to_lex_ordering( + sort_exprs: LexOrderingRef, + offset: usize, +) -> Result { + sort_exprs + .iter() + .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset)) + .collect() +} + #[cfg(test)] mod tests { use super::*; @@ -523,8 +978,20 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; + use arrow_schema::SortOptions; use std::sync::Arc; + fn convert_to_requirement( + in_data: &[(&Column, Option)], + ) -> Vec { + in_data + .iter() + .map(|(col, options)| { + PhysicalSortRequirement::new(Arc::new((*col).clone()) as _, *options) + }) + .collect::>() + } + #[test] fn add_equal_conditions_test() -> Result<()> { let schema = Arc::new(Schema::new(vec![ @@ -615,4 +1082,53 @@ mod tests { Ok(()) } + + #[test] + fn test_collapse_vec() -> Result<()> { + assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]); + assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]); + assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]); + Ok(()) + } + + #[test] + fn test_get_compatible_ranges() -> Result<()> { + let col_a = &Column::new("a", 0); + let col_b = &Column::new("b", 1); + let option1 = SortOptions { + descending: false, + nulls_first: false, + }; + let test_data = vec![ + ( + vec![(col_a, Some(option1)), (col_b, Some(option1))], + vec![(col_a, Some(option1))], + vec![(0, 1)], + ), + ( + vec![(col_a, None), (col_b, Some(option1))], + vec![(col_a, Some(option1))], + vec![(0, 1)], + ), + ( + vec![ + (col_a, None), + (col_b, Some(option1)), + (col_a, Some(option1)), + ], + vec![(col_a, Some(option1))], + vec![(0, 1), (2, 3)], + ), + ]; + for (searched, to_search, expected) in test_data { + let searched = convert_to_requirement(&searched); + let to_search = convert_to_requirement(&to_search); + let expected = expected + .into_iter() + .map(|(start, end)| Range { start, end }) + .collect::>(); + assert_eq!(get_compatible_ranges(&searched, &to_search), expected); + } + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 85081c24c343..e83dee2e6c80 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -55,9 +55,10 @@ pub use aggregate::groups_accumulator::{ pub use aggregate::AggregateExpr; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; pub use equivalence::{ - ordering_equivalence_properties_helper, project_equivalence_properties, - project_ordering_equivalence_properties, EquivalenceProperties, EquivalentClass, - OrderingEquivalenceProperties, OrderingEquivalentClass, + add_offset_to_lex_ordering, ordering_equivalence_properties_helper, + project_equivalence_properties, project_ordering_equivalence_properties, + EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, + OrderingEquivalentClass, }; pub use partitioning::{Distribution, Partitioning}; @@ -70,7 +71,6 @@ pub use sort_expr::{ }; pub use sort_properties::update_ordering; pub use utils::{ - expr_list_eq_any_order, expr_list_eq_strict_order, find_orderings_of_exprs, - normalize_expr_with_equivalence_properties, normalize_ordering_equivalence_classes, + expr_list_eq_any_order, expr_list_eq_strict_order, normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction, }; diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 76567c80509c..773eac40dc8a 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -20,10 +20,7 @@ use std::fmt; use std::sync::Arc; -use crate::{ - expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, - EquivalenceProperties, PhysicalExpr, -}; +use crate::{expr_list_eq_strict_order, EquivalenceProperties, PhysicalExpr}; /// Partitioning schemes supported by operators. #[derive(Debug, Clone)] @@ -90,21 +87,11 @@ impl Partitioning { if !eq_classes.is_empty() { let normalized_required_exprs = required_exprs .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - eq_classes, - ) - }) + .map(|e| eq_properties.normalize_expr(e.clone())) .collect::>(); let normalized_partition_exprs = partition_exprs .iter() - .map(|e| { - normalize_expr_with_equivalence_properties( - e.clone(), - eq_classes, - ) - }) + .map(|e| eq_properties.normalize_expr(e.clone())) .collect::>(); expr_list_eq_strict_order( &normalized_required_exprs, diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 2d3a395728d0..b2a6bb5ca6d2 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -15,21 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::borrow::Borrow; -use std::collections::{HashMap, HashSet}; -use std::ops::Range; -use std::sync::Arc; - -use crate::equivalence::{ - EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, - OrderingEquivalentClass, -}; +use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties}; use crate::expressions::{BinaryExpr, Column, UnKnownColumn}; use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::update_ordering; -use crate::{ - LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, -}; +use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; @@ -42,8 +32,13 @@ use datafusion_common::utils::longest_consecutive_prefix; use datafusion_common::Result; use datafusion_expr::Operator; +use itertools::Itertools; use petgraph::graph::NodeIndex; use petgraph::stable_graph::StableGraph; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; /// Compare the two expr lists are equal no matter the order. /// For example two InListExpr can be considered to be equals no matter the order: @@ -135,109 +130,6 @@ pub fn normalize_out_expr_with_columns_map( .unwrap_or(expr) } -pub fn normalize_expr_with_equivalence_properties( - expr: Arc, - eq_properties: &[EquivalentClass], -) -> Arc { - expr.clone() - .transform(&|expr| { - let normalized_form = - expr.as_any().downcast_ref::().and_then(|column| { - for class in eq_properties { - if class.contains(column) { - return Some(Arc::new(class.head().clone()) as _); - } - } - None - }); - Ok(if let Some(normalized_form) = normalized_form { - Transformed::Yes(normalized_form) - } else { - Transformed::No(expr) - }) - }) - .unwrap_or(expr) -} - -/// This function normalizes `sort_expr` according to `eq_properties`. If the -/// given sort expression doesn't belong to equivalence set `eq_properties`, -/// it returns `sort_expr` as is. -fn normalize_sort_expr_with_equivalence_properties( - mut sort_expr: PhysicalSortExpr, - eq_properties: &[EquivalentClass], -) -> PhysicalSortExpr { - sort_expr.expr = - normalize_expr_with_equivalence_properties(sort_expr.expr, eq_properties); - sort_expr -} - -/// This function applies the [`normalize_sort_expr_with_equivalence_properties`] -/// function for all sort expressions in `sort_exprs` and returns a vector of -/// normalized sort expressions. -pub fn normalize_sort_exprs_with_equivalence_properties( - sort_exprs: LexOrderingRef, - eq_properties: &EquivalenceProperties, -) -> LexOrdering { - sort_exprs - .iter() - .map(|expr| { - normalize_sort_expr_with_equivalence_properties( - expr.clone(), - eq_properties.classes(), - ) - }) - .collect() -} - -/// This function normalizes `sort_requirement` according to `eq_properties`. -/// If the given sort requirement doesn't belong to equivalence set -/// `eq_properties`, it returns `sort_requirement` as is. -fn normalize_sort_requirement_with_equivalence_properties( - mut sort_requirement: PhysicalSortRequirement, - eq_properties: &[EquivalentClass], -) -> PhysicalSortRequirement { - sort_requirement.expr = - normalize_expr_with_equivalence_properties(sort_requirement.expr, eq_properties); - sort_requirement -} - -/// This function searches for the slice `section` inside the slice `given`. -/// It returns each range where `section` is compatible with the corresponding -/// slice in `given`. -fn get_compatible_ranges( - given: &[PhysicalSortRequirement], - section: &[PhysicalSortRequirement], -) -> Vec> { - let n_section = section.len(); - let n_end = if given.len() >= n_section { - given.len() - n_section + 1 - } else { - 0 - }; - (0..n_end) - .filter_map(|idx| { - let end = idx + n_section; - given[idx..end] - .iter() - .zip(section) - .all(|(req, given)| given.compatible(req)) - .then_some(Range { start: idx, end }) - }) - .collect() -} - -/// This function constructs a duplicate-free vector by filtering out duplicate -/// entries inside the given vector `input`. -fn collapse_vec(input: Vec) -> Vec { - let mut output = vec![]; - for item in input { - if !output.contains(&item) { - output.push(item); - } - } - output -} - /// Transform `sort_exprs` vector, to standardized version using `eq_properties` and `ordering_eq_properties` /// Assume `eq_properties` states that `Column a` and `Column b` are aliases. /// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]` and `vec![a ASC, c ASC]` are @@ -246,10 +138,10 @@ fn collapse_vec(input: Vec) -> Vec { /// This function converts `sort_exprs` `vec![b ASC, c ASC]` to first `vec![a ASC, c ASC]` after considering `eq_properties` /// Then converts `vec![a ASC, c ASC]` to `vec![d ASC]` after considering `ordering_eq_properties`. /// Standardized version `vec![d ASC]` is used in subsequent operations. -pub fn normalize_sort_exprs( +fn normalize_sort_exprs( sort_exprs: &[PhysicalSortExpr], - eq_properties: &[EquivalentClass], - ordering_eq_properties: &[OrderingEquivalentClass], + eq_properties: &EquivalenceProperties, + ordering_eq_properties: &OrderingEquivalenceProperties, ) -> Vec { let sort_requirements = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); let normalized_exprs = normalize_sort_requirements( @@ -257,35 +149,7 @@ pub fn normalize_sort_exprs( eq_properties, ordering_eq_properties, ); - let normalized_exprs = PhysicalSortRequirement::to_sort_exprs(normalized_exprs); - collapse_vec(normalized_exprs) -} -/// This function normalizes `oeq_classes` expressions according to `eq_properties`. -/// More explicitly, it makes sure that expressions in `oeq_classes` are head entries -/// in `eq_properties`, replacing any non-head entries with head entries if necessary. -pub fn normalize_ordering_equivalence_classes( - oeq_classes: &[OrderingEquivalentClass], - eq_properties: &EquivalenceProperties, -) -> Vec { - oeq_classes - .iter() - .map(|class| { - let head = normalize_sort_exprs_with_equivalence_properties( - class.head(), - eq_properties, - ); - - let others = class - .others() - .iter() - .map(|other| { - normalize_sort_exprs_with_equivalence_properties(other, eq_properties) - }) - .collect(); - - EquivalentClass::new(head, others) - }) - .collect() + PhysicalSortRequirement::to_sort_exprs(normalized_exprs) } /// Transform `sort_reqs` vector, to standardized version using `eq_properties` and `ordering_eq_properties` @@ -296,53 +160,13 @@ pub fn normalize_ordering_equivalence_classes( /// This function converts `sort_exprs` `vec![b Some(ASC), c None]` to first `vec![a Some(ASC), c None]` after considering `eq_properties` /// Then converts `vec![a Some(ASC), c None]` to `vec![d Some(ASC)]` after considering `ordering_eq_properties`. /// Standardized version `vec![d Some(ASC)]` is used in subsequent operations. -pub fn normalize_sort_requirements( +fn normalize_sort_requirements( sort_reqs: &[PhysicalSortRequirement], - eq_properties: &[EquivalentClass], - ordering_eq_properties: &[OrderingEquivalentClass], + eq_properties: &EquivalenceProperties, + ordering_eq_properties: &OrderingEquivalenceProperties, ) -> Vec { - let mut normalized_exprs = sort_reqs - .iter() - .map(|sort_req| { - normalize_sort_requirement_with_equivalence_properties( - sort_req.clone(), - eq_properties, - ) - }) - .collect::>(); - for ordering_eq_class in ordering_eq_properties { - for item in ordering_eq_class.others() { - let item = item - .clone() - .into_iter() - .map(|elem| elem.into()) - .collect::>(); - let ranges = get_compatible_ranges(&normalized_exprs, &item); - let mut offset: i64 = 0; - for Range { start, end } in ranges { - let mut head = ordering_eq_class - .head() - .clone() - .into_iter() - .map(|elem| elem.into()) - .collect::>(); - let updated_start = (start as i64 + offset) as usize; - let updated_end = (end as i64 + offset) as usize; - let range = end - start; - offset += head.len() as i64 - range as i64; - let all_none = normalized_exprs[updated_start..updated_end] - .iter() - .all(|req| req.options.is_none()); - if all_none { - for req in head.iter_mut() { - req.options = None; - } - } - normalized_exprs.splice(updated_start..updated_end, head); - } - } - } - collapse_vec(normalized_exprs) + let normalized_sort_reqs = eq_properties.normalize_sort_requirements(sort_reqs); + ordering_eq_properties.normalize_sort_requirements(&normalized_sort_reqs) } /// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. @@ -379,13 +203,11 @@ pub fn ordering_satisfy_concrete< ordering_equal_properties: F2, ) -> bool { let oeq_properties = ordering_equal_properties(); - let ordering_eq_classes = oeq_properties.classes(); let eq_properties = equal_properties(); - let eq_classes = eq_properties.classes(); let required_normalized = - normalize_sort_exprs(required, eq_classes, ordering_eq_classes); + normalize_sort_exprs(required, &eq_properties, &oeq_properties); let provided_normalized = - normalize_sort_exprs(provided, eq_classes, ordering_eq_classes); + normalize_sort_exprs(provided, &eq_properties, &oeq_properties); if required_normalized.len() > provided_normalized.len() { return false; } @@ -430,13 +252,11 @@ pub fn ordering_satisfy_requirement_concrete< ordering_equal_properties: F2, ) -> bool { let oeq_properties = ordering_equal_properties(); - let ordering_eq_classes = oeq_properties.classes(); let eq_properties = equal_properties(); - let eq_classes = eq_properties.classes(); let required_normalized = - normalize_sort_requirements(required, eq_classes, ordering_eq_classes); + normalize_sort_requirements(required, &eq_properties, &oeq_properties); let provided_normalized = - normalize_sort_exprs(provided, eq_classes, ordering_eq_classes); + normalize_sort_exprs(provided, &eq_properties, &oeq_properties); if required_normalized.len() > provided_normalized.len() { return false; } @@ -481,14 +301,12 @@ fn requirements_compatible_concrete< equal_properties: F2, ) -> bool { let oeq_properties = ordering_equal_properties(); - let ordering_eq_classes = oeq_properties.classes(); let eq_properties = equal_properties(); - let eq_classes = eq_properties.classes(); let required_normalized = - normalize_sort_requirements(required, eq_classes, ordering_eq_classes); + normalize_sort_requirements(required, &eq_properties, &oeq_properties); let provided_normalized = - normalize_sort_requirements(provided, eq_classes, ordering_eq_classes); + normalize_sort_requirements(provided, &eq_properties, &oeq_properties); if required_normalized.len() > provided_normalized.len() { return false; } @@ -542,26 +360,15 @@ pub fn convert_to_expr>( /// This function finds the indices of `targets` within `items`, taking into /// account equivalences according to `equal_properties`. -pub fn get_indices_of_matching_exprs< - T: Borrow>, - F: FnOnce() -> EquivalenceProperties, ->( - targets: impl IntoIterator, +pub fn get_indices_of_matching_exprs EquivalenceProperties>( + targets: &[Arc], items: &[Arc], equal_properties: F, ) -> Vec { - if let eq_classes @ [_, ..] = equal_properties().classes() { - let normalized_targets = targets.into_iter().map(|e| { - normalize_expr_with_equivalence_properties(e.borrow().clone(), eq_classes) - }); - let normalized_items = items - .iter() - .map(|e| normalize_expr_with_equivalence_properties(e.clone(), eq_classes)) - .collect::>(); - get_indices_of_exprs_strict(normalized_targets, &normalized_items) - } else { - get_indices_of_exprs_strict(targets, items) - } + let eq_properties = equal_properties(); + let normalized_items = eq_properties.normalize_exprs(items); + let normalized_targets = eq_properties.normalize_exprs(targets); + get_indices_of_exprs_strict(normalized_targets, &normalized_items) } /// This function finds the indices of `targets` within `items` using strict @@ -870,13 +677,13 @@ pub fn get_indices_of_matching_sort_exprs_with_order_eq( let normalized_required = normalize_sort_requirements( &sort_requirement_on_requirements, - eq_properties.classes(), - &[], + eq_properties, + &OrderingEquivalenceProperties::new(order_eq_properties.schema()), ); let normalized_provided = normalize_sort_requirements( &PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()), - eq_properties.classes(), - &[], + eq_properties, + &OrderingEquivalenceProperties::new(order_eq_properties.schema()), ); let provided_sorts = normalized_provided @@ -902,9 +709,9 @@ pub fn get_indices_of_matching_sort_exprs_with_order_eq( } // We did not find all the expressions, consult ordering equivalence properties: - for class in order_eq_properties.classes() { - let head = class.head(); - for ordering in class.others().iter().chain(std::iter::once(head)) { + if let Some(oeq_class) = order_eq_properties.oeq_class() { + let head = oeq_class.head(); + for ordering in oeq_class.others().iter().chain(std::iter::once(head)) { let order_eq_class_exprs = convert_to_expr(ordering); if let Some(indices_of_equality) = get_lexicographical_match_indices( &normalized_required_expr, @@ -981,6 +788,18 @@ pub fn find_orderings_of_exprs( Ok(orderings) } +/// Merge left and right sort expressions, checking for duplicates. +pub fn merge_vectors( + left: &[PhysicalSortExpr], + right: &[PhysicalSortExpr], +) -> Vec { + left.iter() + .cloned() + .chain(right.iter().cloned()) + .unique() + .collect() +} + #[cfg(test)] mod tests { use std::fmt::{Display, Formatter}; @@ -990,7 +809,7 @@ mod tests { use super::*; use crate::equivalence::OrderingEquivalenceProperties; use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal}; - use crate::PhysicalSortExpr; + use crate::{OrderingEquivalentClass, PhysicalSortExpr}; use arrow::compute::SortOptions; use arrow_array::Int32Array; @@ -1046,7 +865,8 @@ mod tests { let c = Field::new("c", DataType::Int32, true); let d = Field::new("d", DataType::Int32, true); let e = Field::new("e", DataType::Int32, true); - let schema = Arc::new(Schema::new(vec![a, b, c, d, e])); + let f = Field::new("f", DataType::Int32, true); + let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f])); Ok(schema) } @@ -1057,13 +877,15 @@ mod tests { OrderingEquivalenceProperties, )> { // Assume schema satisfies ordering a ASC NULLS LAST - // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, b ASC NULLS LAST + // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, f ASC NULLS LAST, g ASC NULLS LAST // Assume that column a and c are aliases. let col_a = &Column::new("a", 0); let col_b = &Column::new("b", 1); let col_c = &Column::new("c", 2); let col_d = &Column::new("d", 3); let col_e = &Column::new("e", 4); + let col_f = &Column::new("f", 5); + let col_g = &Column::new("g", 6); let option1 = SortOptions { descending: false, nulls_first: false, @@ -1104,7 +926,11 @@ mod tests { options: option2, }, PhysicalSortExpr { - expr: Arc::new(col_b.clone()), + expr: Arc::new(col_f.clone()), + options: option1, + }, + PhysicalSortExpr { + expr: Arc::new(col_g.clone()), options: option1, }, ], @@ -1312,6 +1138,8 @@ mod tests { let col_c = &Column::new("c", 2); let col_d = &Column::new("d", 3); let col_e = &Column::new("e", 4); + let col_f = &Column::new("f", 5); + let col_g = &Column::new("g", 6); let option1 = SortOptions { descending: false, nulls_first: false, @@ -1342,10 +1170,16 @@ mod tests { (vec![(col_c, option1)], true), (vec![(col_c, option2)], false), // Test whether ordering equivalence works as expected - (vec![(col_d, option1)], false), + (vec![(col_d, option1)], true), (vec![(col_d, option1), (col_b, option1)], true), (vec![(col_d, option2), (col_b, option1)], false), - (vec![(col_e, option2), (col_b, option1)], true), + ( + vec![(col_e, option2), (col_f, option1), (col_g, option1)], + true, + ), + (vec![(col_e, option2), (col_f, option1)], true), + (vec![(col_e, option1), (col_f, option1)], false), + (vec![(col_e, option2), (col_b, option1)], false), (vec![(col_e, option1), (col_b, option1)], false), ( vec![ @@ -1356,6 +1190,15 @@ mod tests { ], true, ), + ( + vec![ + (col_d, option1), + (col_b, option1), + (col_e, option2), + (col_f, option1), + ], + true, + ), ( vec![ (col_d, option1), @@ -1372,6 +1215,15 @@ mod tests { (col_d, option2), (col_b, option1), ], + true, + ), + ( + vec![ + (col_d, option1), + (col_b, option1), + (col_e, option1), + (col_f, option1), + ], false, ), ( @@ -1383,7 +1235,9 @@ mod tests { ], false, ), + (vec![(col_d, option1), (col_e, option2)], true), ]; + for (cols, expected) in requirements { let err_msg = format!("Error in test case:{cols:?}"); let required = cols @@ -1427,6 +1281,7 @@ mod tests { let col_c = &Column::new("c", 2); let col_d = &Column::new("d", 3); let col_e = &Column::new("e", 4); + let col_f = &Column::new("f", 5); let option1 = SortOptions { descending: false, nulls_first: false, @@ -1438,36 +1293,46 @@ mod tests { // First element in the tuple stores vector of requirement, second element is the expected return value for ordering_satisfy function let requirements = vec![ (vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]), - (vec![(col_a, None)], vec![(col_a, None)]), + (vec![(col_a, Some(option2))], vec![(col_a, Some(option2))]), + (vec![(col_a, None)], vec![(col_a, Some(option1))]), // Test whether equivalence works as expected (vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]), - (vec![(col_c, None)], vec![(col_a, None)]), + (vec![(col_c, None)], vec![(col_a, Some(option1))]), // Test whether ordering equivalence works as expected ( vec![(col_d, Some(option1)), (col_b, Some(option1))], vec![(col_a, Some(option1))], ), - (vec![(col_d, None), (col_b, None)], vec![(col_a, None)]), ( - vec![(col_e, Some(option2)), (col_b, Some(option1))], + vec![(col_d, None), (col_b, None)], + vec![(col_a, Some(option1))], + ), + ( + vec![(col_e, Some(option2)), (col_f, Some(option1))], vec![(col_a, Some(option1))], ), // We should be able to normalize in compatible requirements also (not exactly equal) ( - vec![(col_e, Some(option2)), (col_b, None)], + vec![(col_e, Some(option2)), (col_f, None)], + vec![(col_a, Some(option1))], + ), + ( + vec![(col_e, None), (col_f, None)], vec![(col_a, Some(option1))], ), - (vec![(col_e, None), (col_b, None)], vec![(col_a, None)]), ]; + let (_test_schema, eq_properties, ordering_eq_properties) = create_test_params()?; - let eq_classes = eq_properties.classes(); - let ordering_eq_classes = ordering_eq_properties.classes(); for (reqs, expected_normalized) in requirements.into_iter() { let req = convert_to_requirement(&reqs); let expected_normalized = convert_to_requirement(&expected_normalized); assert_eq!( - normalize_sort_requirements(&req, eq_classes, ordering_eq_classes), + normalize_sort_requirements( + &req, + &eq_properties, + &ordering_eq_properties, + ), expected_normalized ); } @@ -1536,10 +1401,7 @@ mod tests { ]; for (expr, expected_eq) in expressions { assert!( - expected_eq.eq(&normalize_expr_with_equivalence_properties( - expr.clone(), - eq_properties.classes() - )), + expected_eq.eq(&eq_properties.normalize_expr(expr.clone())), "error in test: expr: {expr:?}" ); } @@ -1583,10 +1445,7 @@ mod tests { sort_options, ); assert!( - expected.eq(&normalize_sort_requirement_with_equivalence_properties( - arg.clone(), - eq_properties.classes() - )), + expected.eq(&eq_properties.normalize_sort_requirement(arg.clone())), "error in test: expr: {expr:?}, sort_options: {sort_options:?}" ); } @@ -1685,55 +1544,6 @@ mod tests { Ok(()) } - #[test] - fn test_get_compatible_ranges() -> Result<()> { - let col_a = &Column::new("a", 0); - let col_b = &Column::new("b", 1); - let option1 = SortOptions { - descending: false, - nulls_first: false, - }; - let test_data = vec![ - ( - vec![(col_a, Some(option1)), (col_b, Some(option1))], - vec![(col_a, Some(option1))], - vec![(0, 1)], - ), - ( - vec![(col_a, None), (col_b, Some(option1))], - vec![(col_a, Some(option1))], - vec![(0, 1)], - ), - ( - vec![ - (col_a, None), - (col_b, Some(option1)), - (col_a, Some(option1)), - ], - vec![(col_a, Some(option1))], - vec![(0, 1), (2, 3)], - ), - ]; - for (searched, to_search, expected) in test_data { - let searched = convert_to_requirement(&searched); - let to_search = convert_to_requirement(&to_search); - let expected = expected - .into_iter() - .map(|(start, end)| Range { start, end }) - .collect::>(); - assert_eq!(get_compatible_ranges(&searched, &to_search), expected); - } - Ok(()) - } - - #[test] - fn test_collapse_vec() -> Result<()> { - assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]); - assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]); - assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]); - Ok(()) - } - #[test] fn test_collect_columns() -> Result<()> { let expr1 = Arc::new(Column::new("col1", 2)) as _; @@ -1940,22 +1750,20 @@ mod tests { Field::new("c", DataType::Int32, true), ]); let mut equal_properties = EquivalenceProperties::new(Arc::new(schema.clone())); - let mut ordering_equal_properties = - OrderingEquivalenceProperties::new(Arc::new(schema.clone())); let mut expected_oeq = OrderingEquivalenceProperties::new(Arc::new(schema)); equal_properties .add_equal_conditions((&Column::new("a", 0), &Column::new("c", 2))); - ordering_equal_properties.add_equal_conditions(( - &vec![PhysicalSortExpr { - expr: Arc::new(Column::new("b", 1)), - options: sort_options, - }], - &vec![PhysicalSortExpr { - expr: Arc::new(Column::new("c", 2)), - options: sort_options, - }], - )); + let head = vec![PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: sort_options, + }]; + let others = vec![vec![PhysicalSortExpr { + expr: Arc::new(Column::new("c", 2)), + options: sort_options, + }]]; + let oeq_class = OrderingEquivalentClass::new(head, others); + expected_oeq.add_equal_conditions(( &vec![PhysicalSortExpr { expr: Arc::new(Column::new("b", 1)), @@ -1967,13 +1775,13 @@ mod tests { }], )); - assert!(!normalize_ordering_equivalence_classes( - ordering_equal_properties.classes(), - &equal_properties, - ) - .iter() - .zip(expected_oeq.classes()) - .any(|(a, b)| a.head().ne(b.head()) || a.others().ne(b.others()))); + let normalized_oeq_class = + oeq_class.normalize_with_equivalence_properties(&equal_properties); + let expected = expected_oeq.oeq_class().unwrap(); + assert!( + normalized_oeq_class.head().eq(expected.head()) + && normalized_oeq_class.others().eq(expected.others()) + ); Ok(()) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 15208fd0829e..4a8b18914411 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -41,12 +41,13 @@ use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::BinaryExpr; -use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::{ analyze, split_conjunction, AnalysisContext, ExprBoundaries, OrderingEquivalenceProperties, PhysicalExpr, }; +use datafusion_physical_expr::intervals::utils::check_support; +use datafusion_physical_expr::utils::collect_columns; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -153,7 +154,19 @@ impl ExecutionPlan for FilterExec { } fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { - self.input.ordering_equivalence_properties() + let stats = self.statistics(); + // Add the columns that have only one value (singleton) after filtering to constants. + if let Some(col_stats) = stats.column_statistics { + let constants = collect_columns(self.predicate()) + .into_iter() + .filter(|column| col_stats[column.index()].is_singleton()) + .map(|column| Arc::new(column) as Arc) + .collect::>(); + let filter_oeq = self.input.ordering_equivalence_properties(); + filter_oeq.with_constants(constants) + } else { + self.input.ordering_equivalence_properties() + } } fn with_new_children( @@ -197,7 +210,14 @@ impl ExecutionPlan for FilterExec { let input_stats = self.input.statistics(); let input_column_stats = match input_stats.column_statistics { Some(stats) => stats, - None => return Statistics::default(), + None => self + .schema() + .fields + .iter() + .map(|field| { + ColumnStatistics::new_with_unbounded_column(field.data_type()) + }) + .collect::>(), }; let starter_ctx = diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e33de001df30..67f60e57d7d1 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -44,17 +44,15 @@ use datafusion_common::{ exec_err, plan_err, DataFusionError, JoinType, Result, ScalarValue, SharedResult, }; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::utils::{ - normalize_ordering_equivalence_classes, normalize_sort_exprs, -}; use datafusion_physical_expr::{ - EquivalentClass, LexOrdering, LexOrderingRef, OrderingEquivalenceProperties, - OrderingEquivalentClass, PhysicalExpr, PhysicalSortExpr, + add_offset_to_lex_ordering, EquivalentClass, LexOrdering, LexOrderingRef, + OrderingEquivalenceProperties, OrderingEquivalentClass, PhysicalExpr, + PhysicalSortExpr, }; +use datafusion_physical_expr::utils::merge_vectors; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; -use itertools::Itertools; use parking_lot::Mutex; /// The on clause of the join, as vector of (left, right) columns. @@ -324,66 +322,24 @@ pub fn cross_join_equivalence_properties( /// /// This way; once we normalize an expression according to equivalence properties, /// it can thereafter safely be used for ordering equivalence normalization. -fn get_updated_right_ordering_equivalence_properties( +fn get_updated_right_ordering_equivalent_class( join_type: &JoinType, - right_oeq_classes: &[OrderingEquivalentClass], + right_oeq_class: &OrderingEquivalentClass, left_columns_len: usize, join_eq_properties: &EquivalenceProperties, -) -> Result> { - let updated_oeqs = match join_type { +) -> Result { + match join_type { // In these modes, indices of the right schema should be offset by // the left table size. JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { - add_offset_to_ordering_equivalence_classes( - right_oeq_classes, - left_columns_len, - )? + let right_oeq_class = right_oeq_class.add_offset(left_columns_len)?; + return Ok( + right_oeq_class.normalize_with_equivalence_properties(join_eq_properties) + ); } - _ => right_oeq_classes.to_vec(), + _ => {} }; - - Ok(normalize_ordering_equivalence_classes( - &updated_oeqs, - join_eq_properties, - )) -} - -/// Merge left and right sort expressions, checking for duplicates. -fn merge_vectors( - left: &[PhysicalSortExpr], - right: &[PhysicalSortExpr], -) -> Vec { - left.iter() - .cloned() - .chain(right.iter().cloned()) - .unique() - .collect() -} - -/// Prefix with existing ordering. -fn prefix_ordering_equivalence_with_existing_ordering( - existing_ordering: &[PhysicalSortExpr], - oeq_classes: &[OrderingEquivalentClass], - eq_classes: &[EquivalentClass], -) -> Vec { - let existing_ordering = normalize_sort_exprs(existing_ordering, eq_classes, &[]); - oeq_classes - .iter() - .map(|oeq_class| { - let normalized_head = normalize_sort_exprs(oeq_class.head(), eq_classes, &[]); - let updated_head = merge_vectors(&existing_ordering, &normalized_head); - let updated_others = oeq_class - .others() - .iter() - .map(|ordering| { - let normalized_ordering = - normalize_sort_exprs(ordering, eq_classes, &[]); - merge_vectors(&existing_ordering, &normalized_ordering) - }) - .collect(); - OrderingEquivalentClass::new(updated_head, updated_others) - }) - .collect() + Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)) } /// Calculate ordering equivalence properties for the given join operation. @@ -411,20 +367,29 @@ pub fn combine_join_ordering_equivalence_properties( )) } (true, false) => { - new_properties.extend(left_oeq_properties.classes().iter().cloned()); + new_properties.extend(left_oeq_properties.oeq_class().cloned()); // In this special case, right side ordering can be prefixed with left side ordering. - if probe_side == Some(JoinSide::Left) - && right.output_ordering().is_some() - && *join_type == JoinType::Inner - { - let right_oeq_classes = - get_updated_right_ordering_equivalence_properties( - join_type, - right_oeq_properties.classes(), - left_columns_len, - &join_eq_properties, - )?; + if let ( + Some(JoinSide::Left), + // right side have an ordering + Some(_), + JoinType::Inner, + Some(oeq_class), + ) = ( + probe_side, + right.output_ordering(), + join_type, + right_oeq_properties.oeq_class(), + ) { let left_output_ordering = left.output_ordering().unwrap_or(&[]); + + let updated_right_oeq = get_updated_right_ordering_equivalent_class( + join_type, + oeq_class, + left_columns_len, + &join_eq_properties, + )?; + // Right side ordering equivalence properties should be prepended with // those of the left side while constructing output ordering equivalence // properties since stream side is the left side. @@ -433,32 +398,44 @@ pub fn combine_join_ordering_equivalence_properties( // ordering of the left table is `a ASC`, then the ordering equivalence `b ASC` // for the right table should be converted to `a ASC, b ASC` before it is added // to the ordering equivalences of the join. - let updated_right_oeq_classes = - prefix_ordering_equivalence_with_existing_ordering( + let updated_right_oeq_class = updated_right_oeq + .prefix_ordering_equivalent_class_with_existing_ordering( left_output_ordering, - &right_oeq_classes, - join_eq_properties.classes(), + &join_eq_properties, ); - new_properties.extend(updated_right_oeq_classes); + new_properties.extend(Some(updated_right_oeq_class)); } } (false, true) => { - let right_oeq_classes = get_updated_right_ordering_equivalence_properties( - join_type, - right_oeq_properties.classes(), - left_columns_len, - &join_eq_properties, - )?; - new_properties.extend(right_oeq_classes); + let updated_right_oeq = right_oeq_properties + .oeq_class() + .map(|right_oeq_class| { + get_updated_right_ordering_equivalent_class( + join_type, + right_oeq_class, + left_columns_len, + &join_eq_properties, + ) + }) + .transpose()?; + new_properties.extend(updated_right_oeq); // In this special case, left side ordering can be prefixed with right side ordering. - if probe_side == Some(JoinSide::Right) - && left.output_ordering().is_some() - && *join_type == JoinType::Inner - { - let left_oeq_classes = left_oeq_properties.classes(); + if let ( + Some(JoinSide::Right), + // left side have an ordering + Some(_), + JoinType::Inner, + Some(left_oeq_class), + ) = ( + probe_side, + left.output_ordering(), + join_type, + left_oeq_properties.oeq_class(), + ) { let right_output_ordering = right.output_ordering().unwrap_or(&[]); let right_output_ordering = add_offset_to_lex_ordering(right_output_ordering, left_columns_len)?; + // Left side ordering equivalence properties should be prepended with // those of the right side while constructing output ordering equivalence // properties since stream side is the right side. @@ -467,13 +444,12 @@ pub fn combine_join_ordering_equivalence_properties( // ordering of the left table is `a ASC`, then the ordering equivalence `b ASC` // for the right table should be converted to `a ASC, b ASC` before it is added // to the ordering equivalences of the join. - let updated_left_oeq_classes = - prefix_ordering_equivalence_with_existing_ordering( + let updated_left_oeq_class = left_oeq_class + .prefix_ordering_equivalent_class_with_existing_ordering( &right_output_ordering, - left_oeq_classes, - join_eq_properties.classes(), + &join_eq_properties, ); - new_properties.extend(updated_left_oeq_classes); + new_properties.extend(Some(updated_left_oeq_class)); } } (false, false) => {} @@ -481,64 +457,6 @@ pub fn combine_join_ordering_equivalence_properties( Ok(new_properties) } -/// Adds the `offset` value to `Column` indices inside `expr`. This function is -/// generally used during the update of the right table schema in join operations. -pub(crate) fn add_offset_to_expr( - expr: Arc, - offset: usize, -) -> Result> { - expr.transform_down(&|e| match e.as_any().downcast_ref::() { - Some(col) => Ok(Transformed::Yes(Arc::new(Column::new( - col.name(), - offset + col.index(), - )))), - None => Ok(Transformed::No(e)), - }) -} - -/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`. -pub(crate) fn add_offset_to_sort_expr( - sort_expr: &PhysicalSortExpr, - offset: usize, -) -> Result { - Ok(PhysicalSortExpr { - expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?, - options: sort_expr.options, - }) -} - -/// Adds the `offset` value to `Column` indices for each `sort_expr.expr` -/// inside `sort_exprs`. -pub(crate) fn add_offset_to_lex_ordering( - sort_exprs: LexOrderingRef, - offset: usize, -) -> Result { - sort_exprs - .iter() - .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset)) - .collect() -} - -/// Adds the `offset` value to `Column` indices for all expressions inside the -/// given `OrderingEquivalentClass`es. -pub(crate) fn add_offset_to_ordering_equivalence_classes( - oeq_classes: &[OrderingEquivalentClass], - offset: usize, -) -> Result> { - oeq_classes - .iter() - .map(|prop| { - let new_head = add_offset_to_lex_ordering(prop.head(), offset)?; - let new_others = prop - .others() - .iter() - .map(|ordering| add_offset_to_lex_ordering(ordering, offset)) - .collect::>>()?; - Ok(OrderingEquivalentClass::new(new_head, new_others)) - }) - .collect() -} - impl Display for JoinSide { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -1920,7 +1838,7 @@ mod tests { let join_type = JoinType::Inner; let options = SortOptions::default(); - let right_oeq_classes = OrderingEquivalentClass::new( + let right_oeq_class = OrderingEquivalentClass::new( vec![ PhysicalSortExpr { expr: Arc::new(Column::new("x", 0)), @@ -1957,9 +1875,9 @@ mod tests { join_eq_properties .add_equal_conditions((&Column::new("d", 3), &Column::new("w", 7))); - let result = get_updated_right_ordering_equivalence_properties( + let result = get_updated_right_ordering_equivalent_class( &join_type, - &[right_oeq_classes], + &right_oeq_class, left_columns_len, &join_eq_properties, )?; @@ -1987,8 +1905,8 @@ mod tests { ]], ); - assert_eq!(result[0].head(), expected.head()); - assert_eq!(result[0].others(), expected.others()); + assert_eq!(result.head(), expected.head()); + assert_eq!(result.others(), expected.others()); Ok(()) } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index d36d93d29edd..b29c8e9c7bd9 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -296,9 +296,9 @@ mod tests { assert_eq!(mem_exec.output_ordering().unwrap(), expected_output_order); let order_eq = mem_exec.ordering_equivalence_properties(); assert!(order_eq - .classes() - .iter() - .any(|class| class.contains(&expected_order_eq))); + .oeq_class() + .map(|class| class.contains(&expected_order_eq)) + .unwrap_or(false)); Ok(()) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f1ec0a68a6e7..4fc48e971ca9 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -40,11 +40,11 @@ use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::{Literal, UnKnownColumn}; use datafusion_physical_expr::{ - find_orderings_of_exprs, normalize_out_expr_with_columns_map, - project_equivalence_properties, project_ordering_equivalence_properties, - OrderingEquivalenceProperties, + normalize_out_expr_with_columns_map, project_equivalence_properties, + project_ordering_equivalence_properties, OrderingEquivalenceProperties, }; +use datafusion_physical_expr::utils::find_orderings_of_exprs; use futures::stream::{Stream, StreamExt}; use log::trace; diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index a7ed2bf5c743..b09910735809 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -888,6 +888,112 @@ physical_plan ProjectionExec: expr=[a@0 as a, b@1 as b, 2 as Int64(2)] --CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true +# source is ordered by a,b,c +# when filter result is constant for column a +# ordering b, c is still satisfied. Final plan shouldn't have +# SortExec. +query TT +EXPLAIN SELECT * +FROM annotated_data_finite2 +WHERE a=0 +ORDER BY b, c; +---- +logical_plan +Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST +--Filter: annotated_data_finite2.a = Int32(0) +----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0)] +physical_plan +SortPreservingMergeExec: [b@2 ASC NULLS LAST,c@3 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: a@1 = 0 +------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + +# source is ordered by a,b,c +# when filter result is constant for column a and b +# ordering c is still satisfied. Final plan shouldn't have +# SortExec. +query TT +EXPLAIN SELECT * +FROM annotated_data_finite2 +WHERE a=0 and b=0 +ORDER BY c; +---- +logical_plan +Sort: annotated_data_finite2.c ASC NULLS LAST +--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = Int32(0) +----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] +physical_plan +SortPreservingMergeExec: [c@3 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: a@1 = 0 AND b@2 = 0 +------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + +# source is ordered by a,b,c +# when filter result is constant for column a and b +# ordering b, c is still satisfied. Final plan shouldn't have +# SortExec. +query TT +EXPLAIN SELECT * +FROM annotated_data_finite2 +WHERE a=0 and b=0 +ORDER BY b, c; +---- +logical_plan +Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST +--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = Int32(0) +----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] +physical_plan +SortPreservingMergeExec: [b@2 ASC NULLS LAST,c@3 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: a@1 = 0 AND b@2 = 0 +------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + +# source is ordered by a,b,c +# when filter result is constant for column a and b +# ordering a, b, c is still satisfied. Final plan shouldn't have +# SortExec. +query TT +EXPLAIN SELECT * +FROM annotated_data_finite2 +WHERE a=0 and b=0 +ORDER BY a, b, c; +---- +logical_plan +Sort: annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST +--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = Int32(0) +----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] +physical_plan +SortPreservingMergeExec: [a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: a@1 = 0 AND b@2 = 0 +------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + +# source is ordered by a,b,c +# when filter result is when filter contains or +# column a, and b may not be constant. Hence final plan +# should contain SortExec +query TT +EXPLAIN SELECT * +FROM annotated_data_finite2 +WHERE a=0 or b=0 +ORDER BY c; +---- +logical_plan +Sort: annotated_data_finite2.c ASC NULLS LAST +--Filter: annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b = Int32(0) +----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b = Int32(0)] +physical_plan +SortPreservingMergeExec: [c@3 ASC NULLS LAST] +--SortExec: expr=[c@3 ASC NULLS LAST] +----CoalesceBatchesExec: target_batch_size=8192 +------FilterExec: a@1 = 0 OR b@2 = 0 +--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + statement ok drop table annotated_data_finite2; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index fe074da1bba0..2eccb60aad3e 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -284,19 +284,20 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum ------------TableScan: t2 projection=[t2_id, t2_int] physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum] ---CoalesceBatchesExec: target_batch_size=8192 -----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)] -------CoalesceBatchesExec: target_batch_size=8192 ---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] +--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id] +----CoalesceBatchesExec: target_batch_size=8192 +------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] +--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] +----------CoalesceBatchesExec: target_batch_size=8192 +------------FilterExec: SUM(t2.t2_int)@1 < 3 +--------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] +----------------CoalesceBatchesExec: target_batch_size=8192 +------------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 +--------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] +----------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] --------CoalesceBatchesExec: target_batch_size=8192 -----------FilterExec: SUM(t2.t2_int)@1 < 3 -------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] ---------------CoalesceBatchesExec: target_batch_size=8192 -----------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 -------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] ---------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index f8f8f30ade7f..3d9f7511be26 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -2342,11 +2342,10 @@ Limit: skip=0, fetch=5 ----------TableScan: aggregate_test_100 projection=[c9] physical_plan GlobalLimitExec: skip=0, fetch=5 ---SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST] -----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] -------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] ---------SortExec: expr=[c9@0 DESC] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true +--ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] +----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------SortExec: expr=[c9@0 DESC] +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query II SELECT c9, rn1 FROM (SELECT c9,