diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 4a562f4ef101..defd7b5786a3 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::hash::Hash; +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::expressions::{Column, Literal}; @@ -26,12 +27,14 @@ use crate::{ LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; + use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{JoinSide, JoinType, Result}; use indexmap::IndexSet; +use itertools::Itertools; /// An `EquivalenceClass` is a set of [`Arc`]s that are known /// to have the same value for all tuples in a relation. These are generated by @@ -465,31 +468,6 @@ impl EquivalenceGroup { .map(|children| expr.clone().with_new_children(children).unwrap()) } - /// Projects `ordering` according to the given projection mapping. - /// If the resulting ordering is invalid after projection, returns `None`. - fn project_ordering( - &self, - mapping: &ProjectionMapping, - ordering: LexOrderingRef, - ) -> Option { - // If any sort expression is invalid after projection, rest of the - // ordering shouldn't be projected either. For example, if input ordering - // is [a ASC, b ASC, c ASC], and column b is not valid after projection, - // the result should be [a ASC], not [a ASC, c ASC], even if column c is - // valid after projection. - let result = ordering - .iter() - .map_while(|sort_expr| { - self.project_expr(mapping, &sort_expr.expr) - .map(|expr| PhysicalSortExpr { - expr, - options: sort_expr.options, - }) - }) - .collect::>(); - (!result.is_empty()).then_some(result) - } - /// Projects this equivalence group according to the given projection mapping. pub fn project(&self, mapping: &ProjectionMapping) -> Self { let projected_classes = self.iter().filter_map(|cls| { @@ -724,8 +702,21 @@ impl OrderingEquivalenceClass { // Append orderings in `other` to all existing orderings in this equivalence // class. pub fn join_suffix(mut self, other: &Self) -> Self { - for ordering in other.iter() { - for idx in 0..self.orderings.len() { + let n_ordering = self.orderings.len(); + // Replicate entries before cross product + let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering); + self.orderings = self + .orderings + .iter() + .cloned() + .cycle() + .take(n_cross) + .collect(); + // Suffix orderings of other to the current orderings. + for (outer_idx, ordering) in other.iter().enumerate() { + for idx in 0..n_ordering { + // Calculate cross product index + let idx = outer_idx * n_ordering + idx; self.orderings[idx].extend(ordering.iter().cloned()); } } @@ -1196,6 +1187,181 @@ impl EquivalenceProperties { self.eq_group.project_expr(projection_mapping, expr) } + /// Constructs a dependency map based on existing orderings referred to in + /// the projection. + /// + /// This function analyzes the orderings in the normalized order-equivalence + /// class and builds a dependency map. The dependency map captures relationships + /// between expressions within the orderings, helping to identify dependencies + /// and construct valid projected orderings during projection operations. + /// + /// # Parameters + /// + /// - `mapping`: A reference to the `ProjectionMapping` that defines the + /// relationship between source and target expressions. + /// + /// # Returns + /// + /// A [`DependencyMap`] representing the dependency map, where each + /// [`DependencyNode`] contains dependencies for the key [`PhysicalSortExpr`]. + /// + /// # Example + /// + /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`, + /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`. + /// Then, the dependency map will be: + /// + /// ```text + /// a ASC: Node {Some(a_new ASC), HashSet{}} + /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}} + /// c ASC: Node {None, HashSet{a ASC}} + /// ``` + fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap { + let mut dependency_map = HashMap::new(); + for ordering in self.normalized_oeq_class().iter() { + for (idx, sort_expr) in ordering.iter().enumerate() { + let target_sort_expr = + self.project_expr(&sort_expr.expr, mapping).map(|expr| { + PhysicalSortExpr { + expr, + options: sort_expr.options, + } + }); + let is_projected = target_sort_expr.is_some(); + if is_projected + || mapping + .iter() + .any(|(source, _)| expr_refers(source, &sort_expr.expr)) + { + // Previous ordering is a dependency. Note that there is no, + // dependency for a leading ordering (i.e. the first sort + // expression). + let dependency = idx.checked_sub(1).map(|a| &ordering[a]); + // Add sort expressions that can be projected or referred to + // by any of the projection expressions to the dependency map: + dependency_map + .entry(sort_expr.clone()) + .or_insert_with(|| DependencyNode { + target_sort_expr: target_sort_expr.clone(), + dependencies: HashSet::new(), + }) + .insert_dependency(dependency); + } + if !is_projected { + // If we can not project, stop constructing the dependency + // map as remaining dependencies will be invalid after projection. + break; + } + } + } + dependency_map + } + + /// Returns a new `ProjectionMapping` where source expressions are normalized. + /// + /// This normalization ensures that source expressions are transformed into a + /// consistent representation. This is beneficial for algorithms that rely on + /// exact equalities, as it allows for more precise and reliable comparisons. + /// + /// # Parameters + /// + /// - `mapping`: A reference to the original `ProjectionMapping` to be normalized. + /// + /// # Returns + /// + /// A new `ProjectionMapping` with normalized source expressions. + fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping { + // Construct the mapping where source expressions are normalized. In this way + // In the algorithms below we can work on exact equalities + ProjectionMapping { + map: mapping + .iter() + .map(|(source, target)| { + let normalized_source = self.eq_group.normalize_expr(source.clone()); + (normalized_source, target.clone()) + }) + .collect(), + } + } + + /// Computes projected orderings based on a given projection mapping. + /// + /// This function takes a `ProjectionMapping` and computes the possible + /// orderings for the projected expressions. It considers dependencies + /// between expressions and generates valid orderings according to the + /// specified sort properties. + /// + /// # Parameters + /// + /// - `mapping`: A reference to the `ProjectionMapping` that defines the + /// relationship between source and target expressions. + /// + /// # Returns + /// + /// A vector of `LexOrdering` containing all valid orderings after projection. + fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec { + let mapping = self.normalized_mapping(mapping); + + // Get dependency map for existing orderings: + let dependency_map = self.construct_dependency_map(&mapping); + + let orderings = mapping.iter().flat_map(|(source, target)| { + referred_dependencies(&dependency_map, source) + .into_iter() + .filter_map(|relevant_deps| { + if let SortProperties::Ordered(options) = + get_expr_ordering(source, &relevant_deps) + { + Some((options, relevant_deps)) + } else { + // Do not consider unordered cases + None + } + }) + .flat_map(|(options, relevant_deps)| { + let sort_expr = PhysicalSortExpr { + expr: target.clone(), + options, + }; + // Generate dependent orderings (i.e. prefixes for `sort_expr`): + let mut dependency_orderings = + generate_dependency_orderings(&relevant_deps, &dependency_map); + // Append `sort_expr` to the dependent orderings: + for ordering in dependency_orderings.iter_mut() { + ordering.push(sort_expr.clone()); + } + dependency_orderings + }) + }); + + // Add valid projected orderings. For example, if existing ordering is + // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to + // preserve `a_new + b_new` as ordered. Please note that `a_new` and + // `b_new` themselves need not be ordered. Such dependencies cannot be + // deduced via the pass above. + let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| { + let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map); + if prefixes.is_empty() { + // If prefix is empty, there is no dependency. Insert + // empty ordering: + prefixes = vec![vec![]]; + } + // Append current ordering on top its dependencies: + for ordering in prefixes.iter_mut() { + if let Some(target) = &node.target_sort_expr { + ordering.push(target.clone()) + } + } + prefixes + }); + + // Simplify each ordering by removing redundant sections: + orderings + .chain(projected_orderings) + .map(collapse_lex_ordering) + .collect() + } + /// Projects constants based on the provided `ProjectionMapping`. /// /// This function takes a `ProjectionMapping` and identifies/projects @@ -1240,28 +1406,13 @@ impl EquivalenceProperties { projection_mapping: &ProjectionMapping, output_schema: SchemaRef, ) -> Self { - let mut projected_orderings = self - .oeq_class - .iter() - .filter_map(|order| self.eq_group.project_ordering(projection_mapping, order)) - .collect::>(); - for (source, target) in projection_mapping.iter() { - let expr_ordering = ExprOrdering::new(source.clone()) - .transform_up(&|expr| Ok(update_ordering(expr, self))) - // Guaranteed to always return `Ok`. - .unwrap(); - if let SortProperties::Ordered(options) = expr_ordering.state { - // Push new ordering to the state. - projected_orderings.push(vec![PhysicalSortExpr { - expr: target.clone(), - options, - }]); - } - } + let projected_constants = self.projected_constants(projection_mapping); + let projected_eq_group = self.eq_group.project(projection_mapping); + let projected_orderings = self.projected_orderings(projection_mapping); Self { - eq_group: self.eq_group.project(projection_mapping), + eq_group: projected_eq_group, oeq_class: OrderingEquivalenceClass::new(projected_orderings), - constants: self.projected_constants(projection_mapping), + constants: projected_constants, schema: output_schema, } } @@ -1397,6 +1548,270 @@ fn is_constant_recurse( !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c)) } +/// This function examines whether a referring expression directly refers to a +/// given referred expression or if any of its children in the expression tree +/// refer to the specified expression. +/// +/// # Parameters +/// +/// - `referring_expr`: A reference to the referring expression (`Arc`). +/// - `referred_expr`: A reference to the referred expression (`Arc`) +/// +/// # Returns +/// +/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result) +/// `referred_expr` or not. +fn expr_refers( + referring_expr: &Arc, + referred_expr: &Arc, +) -> bool { + referring_expr.eq(referred_expr) + || referring_expr + .children() + .iter() + .any(|child| expr_refers(child, referred_expr)) +} + +/// Wrapper struct for `Arc` to use them as keys in a hash map. +#[derive(Debug, Clone)] +struct ExprWrapper(Arc); + +impl PartialEq for ExprWrapper { + fn eq(&self, other: &Self) -> bool { + self.0.eq(&other.0) + } +} + +impl Eq for ExprWrapper {} + +impl Hash for ExprWrapper { + fn hash(&self, state: &mut H) { + self.0.hash(state); + } +} + +/// This function analyzes the dependency map to collect referred dependencies for +/// a given source expression. +/// +/// # Parameters +/// +/// - `dependency_map`: A reference to the `DependencyMap` where each +/// `PhysicalSortExpr` is associated with a `DependencyNode`. +/// - `source`: A reference to the source expression (`Arc`) +/// for which relevant dependencies need to be identified. +/// +/// # Returns +/// +/// A `Vec` containing the dependencies for the given source +/// expression. These dependencies are expressions that are referred to by +/// the source expression based on the provided dependency map. +fn referred_dependencies( + dependency_map: &DependencyMap, + source: &Arc, +) -> Vec { + // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them: + let mut expr_to_sort_exprs = HashMap::::new(); + for sort_expr in dependency_map + .keys() + .filter(|sort_expr| expr_refers(source, &sort_expr.expr)) + { + let key = ExprWrapper(sort_expr.expr.clone()); + expr_to_sort_exprs + .entry(key) + .or_default() + .insert(sort_expr.clone()); + } + + // Generate all valid dependencies for the source. For example, if the source + // is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get + // `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`. + expr_to_sort_exprs + .values() + .multi_cartesian_product() + .map(|referred_deps| referred_deps.into_iter().cloned().collect()) + .collect() +} + +/// This function recursively analyzes the dependencies of the given sort +/// expression within the given dependency map to construct lexicographical +/// orderings that include the sort expression and its dependencies. +/// +/// # Parameters +/// +/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`) +/// for which lexicographical orderings satisfying its dependencies are to be +/// constructed. +/// - `dependency_map`: A reference to the `DependencyMap` that contains +/// dependencies for different `PhysicalSortExpr`s. +/// +/// # Returns +/// +/// A vector of lexicographical orderings (`Vec`) based on the given +/// sort expression and its dependencies. +fn construct_orderings( + referred_sort_expr: &PhysicalSortExpr, + dependency_map: &DependencyMap, +) -> Vec { + // We are sure that `referred_sort_expr` is inside `dependency_map`. + let node = &dependency_map[referred_sort_expr]; + // Since we work on intermediate nodes, we are sure `val.target_sort_expr` + // exists. + let target_sort_expr = node.target_sort_expr.clone().unwrap(); + if node.dependencies.is_empty() { + vec![vec![target_sort_expr]] + } else { + node.dependencies + .iter() + .flat_map(|dep| { + let mut orderings = construct_orderings(dep, dependency_map); + for ordering in orderings.iter_mut() { + ordering.push(target_sort_expr.clone()) + } + orderings + }) + .collect() + } +} + +/// This function retrieves the dependencies of the given relevant sort expression +/// from the given dependency map. It then constructs prefix orderings by recursively +/// analyzing the dependencies and include them in the orderings. +/// +/// # Parameters +/// +/// - `relevant_sort_expr`: A reference to the relevant sort expression +/// (`PhysicalSortExpr`) for which prefix orderings are to be constructed. +/// - `dependency_map`: A reference to the `DependencyMap` containing dependencies. +/// +/// # Returns +/// +/// A vector of prefix orderings (`Vec`) based on the given relevant +/// sort expression and its dependencies. +fn construct_prefix_orderings( + relevant_sort_expr: &PhysicalSortExpr, + dependency_map: &DependencyMap, +) -> Vec { + dependency_map[relevant_sort_expr] + .dependencies + .iter() + .flat_map(|dep| construct_orderings(dep, dependency_map)) + .collect() +} + +/// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies +/// (`dependency_map`), this function generates all possible prefix orderings +/// based on the given dependencies. +/// +/// # Parameters +/// +/// * `dependencies` - A reference to the dependencies. +/// * `dependency_map` - A reference to the map of dependencies for expressions. +/// +/// # Returns +/// +/// A vector of lexical orderings (`Vec`) representing all valid orderings +/// based on the given dependencies. +fn generate_dependency_orderings( + dependencies: &Dependencies, + dependency_map: &DependencyMap, +) -> Vec { + // Construct all the valid prefix orderings for each expression appearing + // in the projection: + let relevant_prefixes = dependencies + .iter() + .flat_map(|dep| { + let prefixes = construct_prefix_orderings(dep, dependency_map); + (!prefixes.is_empty()).then_some(prefixes) + }) + .collect::>(); + + // No dependency, dependent is a leading ordering. + if relevant_prefixes.is_empty() { + // Return an empty ordering: + return vec![vec![]]; + } + + // Generate all possible orderings where dependencies are satisfied for the + // current projection expression. For example, if expression is `a + b ASC`, + // and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC` + // is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and + // `[d DESC, c ASC, a + b ASC]`. + relevant_prefixes + .into_iter() + .multi_cartesian_product() + .flat_map(|prefix_orderings| { + prefix_orderings + .iter() + .permutations(prefix_orderings.len()) + .map(|prefixes| prefixes.into_iter().flatten().cloned().collect()) + .collect::>() + }) + .collect() +} + +/// This function examines the given expression and the sort expressions it +/// refers to determine the ordering properties of the expression. +/// +/// # Parameters +/// +/// - `expr`: A reference to the source expression (`Arc`) for +/// which ordering properties need to be determined. +/// - `dependencies`: A reference to `Dependencies`, containing sort expressions +/// referred to by `expr`. +/// +/// # Returns +/// +/// A `SortProperties` indicating the ordering information of the given expression. +fn get_expr_ordering( + expr: &Arc, + dependencies: &Dependencies, +) -> SortProperties { + if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) { + // If exact match is found, return its ordering. + SortProperties::Ordered(column_order.options) + } else { + // Find orderings of its children + let child_states = expr + .children() + .iter() + .map(|child| get_expr_ordering(child, dependencies)) + .collect::>(); + // Calculate expression ordering using ordering of its children. + expr.get_ordering(&child_states) + } +} + +/// Represents a node in the dependency map used to construct projected orderings. +/// +/// A `DependencyNode` contains information about a particular sort expression, +/// including its target sort expression and a set of dependencies on other sort +/// expressions. +/// +/// # Fields +/// +/// - `target_sort_expr`: An optional `PhysicalSortExpr` representing the target +/// sort expression associated with the node. It is `None` if the sort expression +/// cannot be projected. +/// - `dependencies`: A [`Dependencies`] containing dependencies on other sort +/// expressions that are referred to by the target sort expression. +#[derive(Debug, Clone, PartialEq, Eq)] +struct DependencyNode { + target_sort_expr: Option, + dependencies: Dependencies, +} + +impl DependencyNode { + // Insert dependency to the state (if exists). + fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) { + if let Some(dep) = dependency { + self.dependencies.insert(dep.clone()); + } + } +} + +type DependencyMap = HashMap; +type Dependencies = HashSet; + /// Calculate ordering equivalence properties for the given join operation. pub fn join_equivalence_properties( left: EquivalenceProperties, @@ -1544,7 +1959,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, Float64Array, RecordBatch, UInt32Array}; use arrow_schema::{Fields, SortOptions, TimeUnit}; - use datafusion_common::{Result, ScalarValue}; + use datafusion_common::{plan_datafusion_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::{BuiltinScalarFunction, Operator}; use itertools::{izip, Itertools}; @@ -1552,6 +1967,37 @@ mod tests { use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; + fn output_schema( + mapping: &ProjectionMapping, + input_schema: &Arc, + ) -> Result { + // Calculate output schema + let fields: Result> = mapping + .iter() + .map(|(source, target)| { + let name = target + .as_any() + .downcast_ref::() + .ok_or_else(|| plan_datafusion_err!("Expects to have column"))? + .name(); + let field = Field::new( + name, + source.data_type(input_schema)?, + source.nullable(input_schema)?, + ); + + Ok(field) + }) + .collect(); + + let output_schema = Arc::new(Schema::new_with_metadata( + fields?, + input_schema.metadata().clone(), + )); + + Ok(output_schema) + } + // Generate a schema which consists of 8 columns (a, b, c, d, e, f, g, h) fn create_test_schema() -> Result { let a = Field::new("a", DataType::Int32, true); @@ -1679,7 +2125,7 @@ mod tests { .map(|(expr, options)| { PhysicalSortRequirement::new((*expr).clone(), *options) }) - .collect::>() + .collect() } // Convert each tuple to PhysicalSortExpr @@ -1692,7 +2138,7 @@ mod tests { expr: (*expr).clone(), options: *options, }) - .collect::>() + .collect() } // Convert each inner tuple to PhysicalSortExpr @@ -1705,6 +2151,56 @@ mod tests { .collect() } + // Convert each tuple to PhysicalSortExpr + fn convert_to_sort_exprs_owned( + in_data: &[(Arc, SortOptions)], + ) -> Vec { + in_data + .iter() + .map(|(expr, options)| PhysicalSortExpr { + expr: (*expr).clone(), + options: *options, + }) + .collect() + } + + // Convert each inner tuple to PhysicalSortExpr + fn convert_to_orderings_owned( + orderings: &[Vec<(Arc, SortOptions)>], + ) -> Vec> { + orderings + .iter() + .map(|sort_exprs| convert_to_sort_exprs_owned(sort_exprs)) + .collect() + } + + // Apply projection to the input_data, return projected equivalence properties and record batch + fn apply_projection( + proj_exprs: Vec<(Arc, String)>, + input_data: &RecordBatch, + input_eq_properties: &EquivalenceProperties, + ) -> Result<(RecordBatch, EquivalenceProperties)> { + let input_schema = input_data.schema(); + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; + + let output_schema = output_schema(&projection_mapping, &input_schema)?; + let num_rows = input_data.num_rows(); + // Apply projection to the input record batch. + let projected_values = projection_mapping + .iter() + .map(|(source, _target)| source.evaluate(input_data)?.into_array(num_rows)) + .collect::>>()?; + let projected_batch = if projected_values.is_empty() { + RecordBatch::new_empty(output_schema.clone()) + } else { + RecordBatch::try_new(output_schema.clone(), projected_values)? + }; + + let projected_eq = + input_eq_properties.project(&projection_mapping, output_schema); + Ok((projected_batch, projected_eq)) + } + #[test] fn add_equal_conditions_test() -> Result<()> { let schema = Arc::new(Schema::new(vec![ @@ -1774,13 +2270,16 @@ mod tests { let input_properties = EquivalenceProperties::new(input_schema.clone()); let col_a = col("a", &input_schema)?; - let out_schema = Arc::new(Schema::new(vec![ - Field::new("a1", DataType::Int64, true), - Field::new("a2", DataType::Int64, true), - Field::new("a3", DataType::Int64, true), - Field::new("a4", DataType::Int64, true), - ])); + // a as a1, a as a2, a as a3, a as a3 + let proj_exprs = vec![ + (col_a.clone(), "a1".to_string()), + (col_a.clone(), "a2".to_string()), + (col_a.clone(), "a3".to_string()), + (col_a.clone(), "a4".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; + let out_schema = output_schema(&projection_mapping, &input_schema)?; // a as a1, a as a2, a as a3, a as a3 let proj_exprs = vec![ (col_a.clone(), "a1".to_string()), @@ -3686,30 +4185,1143 @@ mod tests { } #[test] - fn test_expr_consists_of_constants() -> Result<()> { + fn project_orderings() -> Result<()> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Int32, true), Field::new("c", DataType::Int32, true), Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true), ])); - let col_a = col("a", &schema)?; - let col_b = col("b", &schema)?; - let col_d = col("d", &schema)?; + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let col_d = &col("d", &schema)?; + let col_e = &col("e", &schema)?; + let col_ts = &col("ts", &schema)?; + let interval = Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2)))) + as Arc; + let date_bin_func = &create_physical_expr( + &BuiltinScalarFunction::DateBin, + &[interval, col_ts.clone()], + &schema, + &ExecutionProps::default(), + )?; + let a_plus_b = Arc::new(BinaryExpr::new( + col_a.clone(), + Operator::Plus, + col_b.clone(), + )) as Arc; let b_plus_d = Arc::new(BinaryExpr::new( col_b.clone(), Operator::Plus, col_d.clone(), )) as Arc; + let b_plus_e = Arc::new(BinaryExpr::new( + col_b.clone(), + Operator::Plus, + col_e.clone(), + )) as Arc; + let c_plus_d = Arc::new(BinaryExpr::new( + col_c.clone(), + Operator::Plus, + col_d.clone(), + )) as Arc; - let constants = vec![col_a.clone(), col_b.clone()]; - let expr = b_plus_d.clone(); - assert!(!is_constant_recurse(&constants, &expr)); + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + let option_desc = SortOptions { + descending: true, + nulls_first: true, + }; - let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()]; - let expr = b_plus_d.clone(); - assert!(is_constant_recurse(&constants, &expr)); + let test_cases = vec![ + // ---------- TEST CASE 1 ------------ + ( + // orderings + vec![ + // [b ASC] + vec![(col_b, option_asc)], + ], + // projection exprs + vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())], + // expected + vec![ + // [b_new ASC] + vec![("b_new", option_asc)], + ], + ), + // ---------- TEST CASE 2 ------------ + ( + // orderings + vec![ + // empty ordering + ], + // projection exprs + vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())], + // expected + vec![ + // no ordering at the output + ], + ), + // ---------- TEST CASE 3 ------------ + ( + // orderings + vec![ + // [ts ASC] + vec![(col_ts, option_asc)], + ], + // projection exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_ts, "ts_new".to_string()), + (date_bin_func, "date_bin_res".to_string()), + ], + // expected + vec![ + // [date_bin_res ASC] + vec![("date_bin_res", option_asc)], + // [ts_new ASC] + vec![("ts_new", option_asc)], + ], + ), + // ---------- TEST CASE 4 ------------ + ( + // orderings + vec![ + // [a ASC, ts ASC] + vec![(col_a, option_asc), (col_ts, option_asc)], + // [b ASC, ts ASC] + vec![(col_b, option_asc), (col_ts, option_asc)], + ], + // projection exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_ts, "ts_new".to_string()), + (date_bin_func, "date_bin_res".to_string()), + ], + // expected + vec![ + // [a_new ASC, ts_new ASC] + vec![("a_new", option_asc), ("ts_new", option_asc)], + // [a_new ASC, date_bin_res ASC] + vec![("a_new", option_asc), ("date_bin_res", option_asc)], + // [b_new ASC, ts_new ASC] + vec![("b_new", option_asc), ("ts_new", option_asc)], + // [b_new ASC, date_bin_res ASC] + vec![("b_new", option_asc), ("date_bin_res", option_asc)], + ], + ), + // ---------- TEST CASE 5 ------------ + ( + // orderings + vec![ + // [a + b ASC] + vec![(&a_plus_b, option_asc)], + ], + // projection exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (&a_plus_b, "a+b".to_string()), + ], + // expected + vec![ + // [a + b ASC] + vec![("a+b", option_asc)], + ], + ), + // ---------- TEST CASE 6 ------------ + ( + // orderings + vec![ + // [a + b ASC, c ASC] + vec![(&a_plus_b, option_asc), (&col_c, option_asc)], + ], + // projection exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_c, "c_new".to_string()), + (&a_plus_b, "a+b".to_string()), + ], + // expected + vec![ + // [a + b ASC, c_new ASC] + vec![("a+b", option_asc), ("c_new", option_asc)], + ], + ), + // ------- TEST CASE 7 ---------- + ( + vec![ + // [a ASC, b ASC, c ASC] + vec![(col_a, option_asc), (col_b, option_asc)], + // [a ASC, d ASC] + vec![(col_a, option_asc), (col_d, option_asc)], + ], + // b as b_new, a as a_new, d as d_new b+d + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_d, "d_new".to_string()), + (&b_plus_d, "b+d".to_string()), + ], + // expected + vec![ + // [a_new ASC, b_new ASC] + vec![("a_new", option_asc), ("b_new", option_asc)], + // [a_new ASC, d_new ASC] + vec![("a_new", option_asc), ("d_new", option_asc)], + // [a_new ASC, b+d ASC] + vec![("a_new", option_asc), ("b+d", option_asc)], + ], + ), + // ------- TEST CASE 8 ---------- + ( + // orderings + vec![ + // [b+d ASC] + vec![(&b_plus_d, option_asc)], + ], + // proj exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_d, "d_new".to_string()), + (&b_plus_d, "b+d".to_string()), + ], + // expected + vec![ + // [b+d ASC] + vec![("b+d", option_asc)], + ], + ), + // ------- TEST CASE 9 ---------- + ( + // orderings + vec![ + // [a ASC, d ASC, b ASC] + vec![ + (col_a, option_asc), + (col_d, option_asc), + (col_b, option_asc), + ], + // [c ASC] + vec![(col_c, option_asc)], + ], + // proj exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_d, "d_new".to_string()), + (col_c, "c_new".to_string()), + ], + // expected + vec![ + // [a_new ASC, d_new ASC, b_new ASC] + vec![ + ("a_new", option_asc), + ("d_new", option_asc), + ("b_new", option_asc), + ], + // [c_new ASC], + vec![("c_new", option_asc)], + ], + ), + // ------- TEST CASE 10 ---------- + ( + vec![ + // [a ASC, b ASC, c ASC] + vec![ + (col_a, option_asc), + (col_b, option_asc), + (col_c, option_asc), + ], + // [a ASC, d ASC] + vec![(col_a, option_asc), (col_d, option_asc)], + ], + // proj exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_c, "c_new".to_string()), + (&c_plus_d, "c+d".to_string()), + ], + // expected + vec![ + // [a_new ASC, b_new ASC, c_new ASC] + vec![ + ("a_new", option_asc), + ("b_new", option_asc), + ("c_new", option_asc), + ], + // [a_new ASC, b_new ASC, c+d ASC] + vec![ + ("a_new", option_asc), + ("b_new", option_asc), + ("c+d", option_asc), + ], + ], + ), + // ------- TEST CASE 11 ---------- + ( + // orderings + vec![ + // [a ASC, b ASC] + vec![(col_a, option_asc), (col_b, option_asc)], + // [a ASC, d ASC] + vec![(col_a, option_asc), (col_d, option_asc)], + ], + // proj exprs + vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (&b_plus_d, "b+d".to_string()), + ], + // expected + vec![ + // [a_new ASC, b_new ASC] + vec![("a_new", option_asc), ("b_new", option_asc)], + // [a_new ASC, b + d ASC] + vec![("a_new", option_asc), ("b+d", option_asc)], + ], + ), + // ------- TEST CASE 12 ---------- + ( + // orderings + vec![ + // [a ASC, b ASC, c ASC] + vec![ + (col_a, option_asc), + (col_b, option_asc), + (col_c, option_asc), + ], + ], + // proj exprs + vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())], + // expected + vec![ + // [a_new ASC] + vec![("a_new", option_asc)], + ], + ), + // ------- TEST CASE 13 ---------- + ( + // orderings + vec![ + // [a ASC, b ASC, c ASC] + vec![ + (col_a, option_asc), + (col_b, option_asc), + (col_c, option_asc), + ], + // [a ASC, a + b ASC, c ASC] + vec![ + (col_a, option_asc), + (&a_plus_b, option_asc), + (col_c, option_asc), + ], + ], + // proj exprs + vec![ + (col_c, "c_new".to_string()), + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (&a_plus_b, "a+b".to_string()), + ], + // expected + vec![ + // [a_new ASC, b_new ASC, c_new ASC] + vec![ + ("a_new", option_asc), + ("b_new", option_asc), + ("c_new", option_asc), + ], + // [a_new ASC, a+b ASC, c_new ASC] + vec![ + ("a_new", option_asc), + ("a+b", option_asc), + ("c_new", option_asc), + ], + ], + ), + // ------- TEST CASE 14 ---------- + ( + // orderings + vec![ + // [a ASC, b ASC] + vec![(col_a, option_asc), (col_b, option_asc)], + // [c ASC, b ASC] + vec![(col_c, option_asc), (col_b, option_asc)], + // [d ASC, e ASC] + vec![(col_d, option_asc), (col_e, option_asc)], + ], + // proj exprs + vec![ + (col_c, "c_new".to_string()), + (col_d, "d_new".to_string()), + (col_a, "a_new".to_string()), + (&b_plus_e, "b+e".to_string()), + ], + // expected + vec![ + // [a_new ASC, d_new ASC, b+e ASC] + vec![ + ("a_new", option_asc), + ("d_new", option_asc), + ("b+e", option_asc), + ], + // [d_new ASC, a_new ASC, b+e ASC] + vec![ + ("d_new", option_asc), + ("a_new", option_asc), + ("b+e", option_asc), + ], + // [c_new ASC, d_new ASC, b+e ASC] + vec![ + ("c_new", option_asc), + ("d_new", option_asc), + ("b+e", option_asc), + ], + // [d_new ASC, c_new ASC, b+e ASC] + vec![ + ("d_new", option_asc), + ("c_new", option_asc), + ("b+e", option_asc), + ], + ], + ), + // ------- TEST CASE 15 ---------- + ( + // orderings + vec![ + // [a ASC, c ASC, b ASC] + vec![ + (col_a, option_asc), + (col_c, option_asc), + (&col_b, option_asc), + ], + ], + // proj exprs + vec![ + (col_c, "c_new".to_string()), + (col_a, "a_new".to_string()), + (&a_plus_b, "a+b".to_string()), + ], + // expected + vec![ + // [a_new ASC, d_new ASC, b+e ASC] + vec![ + ("a_new", option_asc), + ("c_new", option_asc), + ("a+b", option_asc), + ], + ], + ), + // ------- TEST CASE 16 ---------- + ( + // orderings + vec![ + // [a ASC, b ASC] + vec![(col_a, option_asc), (col_b, option_asc)], + // [c ASC, b DESC] + vec![(col_c, option_asc), (col_b, option_desc)], + // [e ASC] + vec![(col_e, option_asc)], + ], + // proj exprs + vec![ + (col_c, "c_new".to_string()), + (col_a, "a_new".to_string()), + (col_b, "b_new".to_string()), + (&b_plus_e, "b+e".to_string()), + ], + // expected + vec![ + // [a_new ASC, b_new ASC] + vec![("a_new", option_asc), ("b_new", option_asc)], + // [a_new ASC, b_new ASC] + vec![("a_new", option_asc), ("b+e", option_asc)], + // [c_new ASC, b_new DESC] + vec![("c_new", option_asc), ("b_new", option_desc)], + ], + ), + ]; + + for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate() + { + let mut eq_properties = EquivalenceProperties::new(schema.clone()); + + let orderings = convert_to_orderings(&orderings); + eq_properties.add_new_orderings(orderings); + + let proj_exprs = proj_exprs + .into_iter() + .map(|(expr, name)| (expr.clone(), name)) + .collect::>(); + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?; + let output_schema = output_schema(&projection_mapping, &schema)?; + + let expected = expected + .into_iter() + .map(|ordering| { + ordering + .into_iter() + .map(|(name, options)| { + (col(name, &output_schema).unwrap(), options) + }) + .collect::>() + }) + .collect::>(); + let expected = convert_to_orderings_owned(&expected); + + let projected_eq = eq_properties.project(&projection_mapping, output_schema); + let orderings = projected_eq.oeq_class(); + + let err_msg = format!( + "test_idx: {:?}, actual: {:?}, expected: {:?}, projection_mapping: {:?}", + idx, orderings.orderings, expected, projection_mapping + ); + + assert_eq!(orderings.len(), expected.len(), "{}", err_msg); + for expected_ordering in &expected { + assert!(orderings.contains(expected_ordering), "{}", err_msg) + } + } + + Ok(()) + } + + #[test] + fn project_orderings2() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true), + ])); + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let col_ts = &col("ts", &schema)?; + let a_plus_b = Arc::new(BinaryExpr::new( + col_a.clone(), + Operator::Plus, + col_b.clone(), + )) as Arc; + let interval = Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2)))) + as Arc; + let date_bin_ts = &create_physical_expr( + &BuiltinScalarFunction::DateBin, + &[interval, col_ts.clone()], + &schema, + &ExecutionProps::default(), + )?; + + let round_c = &create_physical_expr( + &BuiltinScalarFunction::Round, + &[col_c.clone()], + &schema, + &ExecutionProps::default(), + )?; + + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + + let proj_exprs = vec![ + (col_b, "b_new".to_string()), + (col_a, "a_new".to_string()), + (col_c, "c_new".to_string()), + (date_bin_ts, "date_bin_res".to_string()), + (round_c, "round_c_res".to_string()), + ]; + let proj_exprs = proj_exprs + .into_iter() + .map(|(expr, name)| (expr.clone(), name)) + .collect::>(); + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?; + let output_schema = output_schema(&projection_mapping, &schema)?; + + let col_a_new = &col("a_new", &output_schema)?; + let col_b_new = &col("b_new", &output_schema)?; + let col_c_new = &col("c_new", &output_schema)?; + let col_date_bin_res = &col("date_bin_res", &output_schema)?; + let col_round_c_res = &col("round_c_res", &output_schema)?; + let a_new_plus_b_new = Arc::new(BinaryExpr::new( + col_a_new.clone(), + Operator::Plus, + col_b_new.clone(), + )) as Arc; + + let test_cases = vec![ + // ---------- TEST CASE 1 ------------ + ( + // orderings + vec![ + // [a ASC] + vec![(col_a, option_asc)], + ], + // expected + vec![ + // [b_new ASC] + vec![(col_a_new, option_asc)], + ], + ), + // ---------- TEST CASE 2 ------------ + ( + // orderings + vec![ + // [a+b ASC] + vec![(&a_plus_b, option_asc)], + ], + // expected + vec![ + // [b_new ASC] + vec![(&a_new_plus_b_new, option_asc)], + ], + ), + // ---------- TEST CASE 3 ------------ + ( + // orderings + vec![ + // [a ASC, ts ASC] + vec![(col_a, option_asc), (col_ts, option_asc)], + ], + // expected + vec![ + // [a_new ASC, date_bin_res ASC] + vec![(col_a_new, option_asc), (col_date_bin_res, option_asc)], + ], + ), + // ---------- TEST CASE 4 ------------ + ( + // orderings + vec![ + // [a ASC, ts ASC, b ASC] + vec![ + (col_a, option_asc), + (col_ts, option_asc), + (col_b, option_asc), + ], + ], + // expected + vec![ + // [a_new ASC, date_bin_res ASC] + // Please note that result is not [a_new ASC, date_bin_res ASC, b_new ASC] + // because, datebin_res may not be 1-1 function. Hence without introducing ts + // dependency we cannot guarantee any ordering after date_bin_res column. + vec![(col_a_new, option_asc), (col_date_bin_res, option_asc)], + ], + ), + // ---------- TEST CASE 5 ------------ + ( + // orderings + vec![ + // [a ASC, c ASC] + vec![(col_a, option_asc), (col_c, option_asc)], + ], + // expected + vec![ + // [a_new ASC, round_c_res ASC, c_new ASC] + vec![(col_a_new, option_asc), (col_round_c_res, option_asc)], + // [a_new ASC, c_new ASC] + vec![(col_a_new, option_asc), (col_c_new, option_asc)], + ], + ), + // ---------- TEST CASE 6 ------------ + ( + // orderings + vec![ + // [c ASC, b ASC] + vec![(col_c, option_asc), (col_b, option_asc)], + ], + // expected + vec![ + // [round_c_res ASC] + vec![(col_round_c_res, option_asc)], + // [c_new ASC, b_new ASC] + vec![(col_c_new, option_asc), (col_b_new, option_asc)], + ], + ), + // ---------- TEST CASE 7 ------------ + ( + // orderings + vec![ + // [a+b ASC, c ASC] + vec![(&a_plus_b, option_asc), (col_c, option_asc)], + ], + // expected + vec![ + // [a+b ASC, round(c) ASC, c_new ASC] + vec![ + (&a_new_plus_b_new, option_asc), + (&col_round_c_res, option_asc), + ], + // [a+b ASC, c_new ASC] + vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)], + ], + ), + ]; + + for (idx, (orderings, expected)) in test_cases.iter().enumerate() { + let mut eq_properties = EquivalenceProperties::new(schema.clone()); + + let orderings = convert_to_orderings(orderings); + eq_properties.add_new_orderings(orderings); + + let expected = convert_to_orderings(expected); + + let projected_eq = + eq_properties.project(&projection_mapping, output_schema.clone()); + let orderings = projected_eq.oeq_class(); + + let err_msg = format!( + "test idx: {:?}, actual: {:?}, expected: {:?}, projection_mapping: {:?}", + idx, orderings.orderings, expected, projection_mapping + ); + + assert_eq!(orderings.len(), expected.len(), "{}", err_msg); + for expected_ordering in &expected { + assert!(orderings.contains(expected_ordering), "{}", err_msg) + } + } + Ok(()) + } + + #[test] + fn project_orderings3() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + Field::new("f", DataType::Int32, true), + ])); + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let col_d = &col("d", &schema)?; + let col_e = &col("e", &schema)?; + let col_f = &col("f", &schema)?; + let a_plus_b = Arc::new(BinaryExpr::new( + col_a.clone(), + Operator::Plus, + col_b.clone(), + )) as Arc; + + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + + let proj_exprs = vec![ + (col_c, "c_new".to_string()), + (col_d, "d_new".to_string()), + (&a_plus_b, "a+b".to_string()), + ]; + let proj_exprs = proj_exprs + .into_iter() + .map(|(expr, name)| (expr.clone(), name)) + .collect::>(); + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?; + let output_schema = output_schema(&projection_mapping, &schema)?; + + let col_a_plus_b_new = &col("a+b", &output_schema)?; + let col_c_new = &col("c_new", &output_schema)?; + let col_d_new = &col("d_new", &output_schema)?; + + let test_cases = vec![ + // ---------- TEST CASE 1 ------------ + ( + // orderings + vec![ + // [d ASC, b ASC] + vec![(col_d, option_asc), (col_b, option_asc)], + // [c ASC, a ASC] + vec![(col_c, option_asc), (col_a, option_asc)], + ], + // equal conditions + vec![], + // expected + vec![ + // [d_new ASC, c_new ASC, a+b ASC] + vec![ + (col_d_new, option_asc), + (col_c_new, option_asc), + (col_a_plus_b_new, option_asc), + ], + // [c_new ASC, d_new ASC, a+b ASC] + vec![ + (col_c_new, option_asc), + (col_d_new, option_asc), + (col_a_plus_b_new, option_asc), + ], + ], + ), + // ---------- TEST CASE 2 ------------ + ( + // orderings + vec![ + // [d ASC, b ASC] + vec![(col_d, option_asc), (col_b, option_asc)], + // [c ASC, e ASC], Please note that a=e + vec![(col_c, option_asc), (col_e, option_asc)], + ], + // equal conditions + vec![(col_e, col_a)], + // expected + vec![ + // [d_new ASC, c_new ASC, a+b ASC] + vec![ + (col_d_new, option_asc), + (col_c_new, option_asc), + (col_a_plus_b_new, option_asc), + ], + // [c_new ASC, d_new ASC, a+b ASC] + vec![ + (col_c_new, option_asc), + (col_d_new, option_asc), + (col_a_plus_b_new, option_asc), + ], + ], + ), + // ---------- TEST CASE 3 ------------ + ( + // orderings + vec![ + // [d ASC, b ASC] + vec![(col_d, option_asc), (col_b, option_asc)], + // [c ASC, e ASC], Please note that a=f + vec![(col_c, option_asc), (col_e, option_asc)], + ], + // equal conditions + vec![(col_a, col_f)], + // expected + vec![ + // [d_new ASC] + vec![(col_d_new, option_asc)], + // [c_new ASC] + vec![(col_c_new, option_asc)], + ], + ), + ]; + for (orderings, equal_columns, expected) in test_cases { + let mut eq_properties = EquivalenceProperties::new(schema.clone()); + for (lhs, rhs) in equal_columns { + eq_properties.add_equal_conditions(lhs, rhs); + } + + let orderings = convert_to_orderings(&orderings); + eq_properties.add_new_orderings(orderings); + + let expected = convert_to_orderings(&expected); + + let projected_eq = + eq_properties.project(&projection_mapping, output_schema.clone()); + let orderings = projected_eq.oeq_class(); + + let err_msg = format!( + "actual: {:?}, expected: {:?}, projection_mapping: {:?}", + orderings.orderings, expected, projection_mapping + ); + + assert_eq!(orderings.len(), expected.len(), "{}", err_msg); + for expected_ordering in &expected { + assert!(orderings.contains(expected_ordering), "{}", err_msg) + } + } + + Ok(()) + } + + #[test] + fn project_orderings_random() -> Result<()> { + const N_RANDOM_SCHEMA: usize = 20; + const N_ELEMENTS: usize = 125; + const N_DISTINCT: usize = 5; + + for seed in 0..N_RANDOM_SCHEMA { + // Create a random schema with random properties + let (test_schema, eq_properties) = create_random_schema(seed as u64)?; + // Generate a data that satisfies properties given + let table_data_with_properties = + generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?; + // Floor(a) + let floor_a = create_physical_expr( + &BuiltinScalarFunction::Floor, + &[col("a", &test_schema)?], + &test_schema, + &ExecutionProps::default(), + )?; + // a + b + let a_plus_b = Arc::new(BinaryExpr::new( + col("a", &test_schema)?, + Operator::Plus, + col("b", &test_schema)?, + )) as Arc; + let proj_exprs = vec![ + (col("a", &test_schema)?, "a_new"), + (col("b", &test_schema)?, "b_new"), + (col("c", &test_schema)?, "c_new"), + (col("d", &test_schema)?, "d_new"), + (col("e", &test_schema)?, "e_new"), + (col("f", &test_schema)?, "f_new"), + (floor_a, "floor(a)"), + (a_plus_b, "a+b"), + ]; + + for n_req in 0..=proj_exprs.len() { + for proj_exprs in proj_exprs.iter().combinations(n_req) { + let proj_exprs = proj_exprs + .into_iter() + .map(|(expr, name)| (expr.clone(), name.to_string())) + .collect::>(); + let (projected_batch, projected_eq) = apply_projection( + proj_exprs.clone(), + &table_data_with_properties, + &eq_properties, + )?; + + // Make sure each ordering after projection is valid. + for ordering in projected_eq.oeq_class().iter() { + let err_msg = format!( + "Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}, proj_exprs: {:?}", + ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, proj_exprs + ); + // Since ordered section satisfies schema, we expect + // that result will be same after sort (e.g sort was unnecessary). + assert!( + is_table_same_after_sort( + ordering.clone(), + projected_batch.clone(), + )?, + "{}", + err_msg + ); + } + } + } + } + + Ok(()) + } + + #[test] + fn ordering_satisfy_after_projection_random() -> Result<()> { + const N_RANDOM_SCHEMA: usize = 20; + const N_ELEMENTS: usize = 125; + const N_DISTINCT: usize = 5; + const SORT_OPTIONS: SortOptions = SortOptions { + descending: false, + nulls_first: false, + }; + + for seed in 0..N_RANDOM_SCHEMA { + // Create a random schema with random properties + let (test_schema, eq_properties) = create_random_schema(seed as u64)?; + // Generate a data that satisfies properties given + let table_data_with_properties = + generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?; + // Floor(a) + let floor_a = create_physical_expr( + &BuiltinScalarFunction::Floor, + &[col("a", &test_schema)?], + &test_schema, + &ExecutionProps::default(), + )?; + // a + b + let a_plus_b = Arc::new(BinaryExpr::new( + col("a", &test_schema)?, + Operator::Plus, + col("b", &test_schema)?, + )) as Arc; + let proj_exprs = vec![ + (col("a", &test_schema)?, "a_new"), + (col("b", &test_schema)?, "b_new"), + (col("c", &test_schema)?, "c_new"), + (col("d", &test_schema)?, "d_new"), + (col("e", &test_schema)?, "e_new"), + (col("f", &test_schema)?, "f_new"), + (floor_a, "floor(a)"), + (a_plus_b, "a+b"), + ]; + + for n_req in 0..=proj_exprs.len() { + for proj_exprs in proj_exprs.iter().combinations(n_req) { + let proj_exprs = proj_exprs + .into_iter() + .map(|(expr, name)| (expr.clone(), name.to_string())) + .collect::>(); + let (projected_batch, projected_eq) = apply_projection( + proj_exprs.clone(), + &table_data_with_properties, + &eq_properties, + )?; + + let projection_mapping = + ProjectionMapping::try_new(&proj_exprs, &test_schema)?; + + let projected_exprs = projection_mapping + .iter() + .map(|(_source, target)| target.clone()) + .collect::>(); + + for n_req in 0..=projected_exprs.len() { + for exprs in projected_exprs.iter().combinations(n_req) { + let requirement = exprs + .into_iter() + .map(|expr| PhysicalSortExpr { + expr: expr.clone(), + options: SORT_OPTIONS, + }) + .collect::>(); + let expected = is_table_same_after_sort( + requirement.clone(), + projected_batch.clone(), + )?; + let err_msg = format!( + "Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}, projected_eq.oeq_class: {:?}, projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping: {:?}", + requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, projected_eq.oeq_class, projected_eq.eq_group, projected_eq.constants, projection_mapping + ); + // Check whether ordering_satisfy API result and + // experimental result matches. + assert_eq!( + projected_eq.ordering_satisfy(&requirement), + expected, + "{}", + err_msg + ); + } + } + } + } + } + + Ok(()) + } + + #[test] + fn test_expr_consists_of_constants() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true), + ])); + let col_a = col("a", &schema)?; + let col_b = col("b", &schema)?; + let col_d = col("d", &schema)?; + let b_plus_d = Arc::new(BinaryExpr::new( + col_b.clone(), + Operator::Plus, + col_d.clone(), + )) as Arc; + + let constants = vec![col_a.clone(), col_b.clone()]; + let expr = b_plus_d.clone(); + assert!(!is_constant_recurse(&constants, &expr)); + + let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()]; + let expr = b_plus_d.clone(); + assert!(is_constant_recurse(&constants, &expr)); + Ok(()) + } + + #[test] + fn test_join_equivalence_properties() -> Result<()> { + let schema = create_test_schema()?; + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let offset = schema.fields.len(); + let col_a2 = &add_offset_to_expr(col_a.clone(), offset); + let col_b2 = &add_offset_to_expr(col_b.clone(), offset); + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + let test_cases = vec![ + // ------- TEST CASE 1 -------- + // [a ASC], [b ASC] + ( + // [a ASC], [b ASC] + vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]], + // [a ASC], [b ASC] + vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]], + // expected [a ASC, a2 ASC], [a ASC, b2 ASC], [b ASC, a2 ASC], [b ASC, b2 ASC] + vec![ + vec![(col_a, option_asc), (col_a2, option_asc)], + vec![(col_a, option_asc), (col_b2, option_asc)], + vec![(col_b, option_asc), (col_a2, option_asc)], + vec![(col_b, option_asc), (col_b2, option_asc)], + ], + ), + // ------- TEST CASE 2 -------- + // [a ASC], [b ASC] + ( + // [a ASC], [b ASC], [c ASC] + vec![ + vec![(col_a, option_asc)], + vec![(col_b, option_asc)], + vec![(col_c, option_asc)], + ], + // [a ASC], [b ASC] + vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]], + // expected [a ASC, a2 ASC], [a ASC, b2 ASC], [b ASC, a2 ASC], [b ASC, b2 ASC], [c ASC, a2 ASC], [c ASC, b2 ASC] + vec![ + vec![(col_a, option_asc), (col_a2, option_asc)], + vec![(col_a, option_asc), (col_b2, option_asc)], + vec![(col_b, option_asc), (col_a2, option_asc)], + vec![(col_b, option_asc), (col_b2, option_asc)], + vec![(col_c, option_asc), (col_a2, option_asc)], + vec![(col_c, option_asc), (col_b2, option_asc)], + ], + ), + ]; + for (left_orderings, right_orderings, expected) in test_cases { + let mut left_eq_properties = EquivalenceProperties::new(schema.clone()); + let mut right_eq_properties = EquivalenceProperties::new(schema.clone()); + let left_orderings = convert_to_orderings(&left_orderings); + let right_orderings = convert_to_orderings(&right_orderings); + let expected = convert_to_orderings(&expected); + left_eq_properties.add_new_orderings(left_orderings); + right_eq_properties.add_new_orderings(right_orderings); + let join_eq = join_equivalence_properties( + left_eq_properties, + right_eq_properties, + &JoinType::Inner, + Arc::new(Schema::empty()), + &[true, false], + Some(JoinSide::Left), + &[], + ); + let orderings = &join_eq.oeq_class.orderings; + let err_msg = format!("expected: {:?}, actual:{:?}", expected, orderings); + assert_eq!( + join_eq.oeq_class.orderings.len(), + expected.len(), + "{}", + err_msg + ); + for ordering in orderings { + assert!( + expected.contains(ordering), + "{}, ordering: {:?}", + err_msg, + ordering + ); + } + } Ok(()) } }