From cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Wed, 19 Jul 2023 16:08:28 +0300 Subject: [PATCH] Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak --- .../physical_optimizer/pipeline_checker.rs | 108 ++- .../tests/sqllogictests/test_files/join.slt | 53 ++ datafusion/physical-expr/src/intervals/mod.rs | 1 + .../src/intervals/prunability.rs | 892 ++++++++++++++++++ 4 files changed, 1051 insertions(+), 3 deletions(-) create mode 100644 datafusion/physical-expr/src/intervals/prunability.rs diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index b12c4ef93fc86..661b1c5f4162d 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -22,12 +22,16 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; use crate::physical_plan::joins::SymmetricHashJoinExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::OptimizerOptions; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::DataFusionError; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::intervals::prunability::{ExprPrunabilityGraph, TableSide}; use datafusion_physical_expr::intervals::{check_support, is_datatype_supported}; +use datafusion_physical_expr::PhysicalSortExpr; use std::sync::Arc; /// The PipelineChecker rule rejects non-runnable query plans that use @@ -138,7 +142,8 @@ pub fn check_finiteness_requirements( ) -> Result> { if let Some(exec) = input.plan.as_any().downcast_ref::() { if !(optimizer_options.allow_symmetric_joins_without_pruning - || (exec.check_if_order_information_available()? && is_prunable(exec))) + || (exec.check_if_order_information_available()? + && is_prunable(exec, &input.children_unbounded))) { const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \ the 'allow_symmetric_joins_without_pruning' configuration flag"; @@ -161,17 +166,114 @@ pub fn check_finiteness_requirements( /// /// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr /// [`Operator`]: datafusion_expr::Operator -fn is_prunable(join: &SymmetricHashJoinExec) -> bool { - join.filter().map_or(false, |filter| { +fn is_prunable(join: &SymmetricHashJoinExec, children_unbounded: &[bool]) -> bool { + if join.filter().map_or(false, |filter| { check_support(filter.expression()) && filter .schema() .fields() .iter() .all(|f| is_datatype_supported(f.data_type())) + }) { + if let Some(filter) = join.filter() { + if let Ok(mut graph) = + ExprPrunabilityGraph::try_new((*filter.expression()).clone()) + { + let left_sort_expr = join.left().output_ordering().map(|s| s[0].clone()); + let right_sort_expr = + join.right().output_ordering().map(|s| s[0].clone()); + let new_left_sort = get_sort_expr_in_filter_schema( + &left_sort_expr, + filter, + JoinSide::Left, + ); + let new_right_sort = get_sort_expr_in_filter_schema( + &right_sort_expr, + filter, + JoinSide::Right, + ); + if let Ok((table_side, _)) = + graph.analyze_prunability(&new_left_sort, &new_right_sort) + { + return prunability_for_unbounded_tables( + children_unbounded[0], + children_unbounded[1], + &table_side, + ); + } + } + } + } + false +} + +// Updates index of the column with the new index (if PhysicalExpr is Column) +fn update_column_index( + sort_expr: &Option, + updated_idx: usize, +) -> Option { + sort_expr.as_ref().and_then(|sort_expr| { + sort_expr + .expr + .as_any() + .downcast_ref::() + .map(|column| { + let sort_name = column.name(); + let options = sort_expr.options; + let expr = Arc::new(Column::new(sort_name, updated_idx)); + PhysicalSortExpr { expr, options } + }) }) } +fn get_sort_expr_in_filter_schema( + sort_expr: &Option, + filter: &JoinFilter, + side: JoinSide, +) -> Option { + let sorted_column_index_in_filter = find_index_in_filter(filter, sort_expr, side); + sorted_column_index_in_filter.and_then(|idx| update_column_index(sort_expr, idx)) +} + +fn find_index_in_filter( + join_filter: &JoinFilter, + left_sort_expr: &Option, + join_side: JoinSide, +) -> Option { + for (i, (field, column_index)) in join_filter + .schema() + .fields() + .iter() + .zip(join_filter.column_indices()) + .enumerate() + { + if let Some(physical_sort) = left_sort_expr { + if let Some(column) = physical_sort.expr.as_any().downcast_ref::() { + if column.name() == field.name() && column_index.side == join_side { + return Some(i); + } + } + } + } + None +} + +fn prunability_for_unbounded_tables( + left_unbounded: bool, + right_unbounded: bool, + table_side: &TableSide, +) -> bool { + let (left_prunable, right_prunable) = match table_side { + TableSide::Left => (true, false), + TableSide::Right => (false, true), + TableSide::Both => (true, true), + TableSide::None => (false, false), + }; + // If both sides are either bounded or prunable, return true (Can do calculations with bounded memory) + // Otherwise return false (Cannot do calculations with bounded memory) + (!left_unbounded || left_prunable) && (!right_unbounded || right_prunable) +} + #[cfg(test)] mod sql_tests { use super::*; diff --git a/datafusion/core/tests/sqllogictests/test_files/join.slt b/datafusion/core/tests/sqllogictests/test_files/join.slt index 283ff57a984cb..ca2805d99a411 100644 --- a/datafusion/core/tests/sqllogictests/test_files/join.slt +++ b/datafusion/core/tests/sqllogictests/test_files/join.slt @@ -590,3 +590,56 @@ drop table IF EXISTS full_join_test; # batch size statement ok set datafusion.execution.batch_size = 8192; + +# test joins give error with unbounded tables by the analysis of prunability +statement ok +CREATE unbounded external table t1(a1 integer, a2 integer, a3 integer) +STORED as CSV +WITH HEADER ROW +WITH ORDER (a2 ASC, a3 ASC) +OPTIONS('infinite_source' 'true') +LOCATION 'tests/data/empty.csv'; + +statement ok +CREATE unbounded external table t2(a1 integer, a2 integer, a3 integer) +STORED as CSV +WITH HEADER ROW +WITH ORDER (a2 ASC, a3 ASC) +OPTIONS('infinite_source' 'true') +LOCATION 'tests/data/empty.csv'; + +statement ok +set datafusion.optimizer.allow_symmetric_joins_without_pruning = false + +# query with a filter causing table to be not prunable +query error DataFusion error: PipelineChecker +SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3 +FROM t1 +JOIN t2 +ON t1.a1 = t2.a1 +WHERE t1.a3 < t2.a3 AND t1.a3 >= t2.a3 + +# query with a filter causing table to be prunable +statement ok +SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3 +FROM t1 +JOIN t2 +ON t1.a1 = t2.a1 +WHERE t2.a2 >= t1.a2 AND t1.a2 > t2.a2 + +statement ok +set datafusion.optimizer.allow_symmetric_joins_without_pruning = true + +# query with a filter causing table to be not prunable, but the join is allowed +statement ok +SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3 +FROM t1 +JOIN t2 +ON t1.a1 = t2.a1 +WHERE t1.a2 < t2.a2 AND t1.a2 < t2.a2 + +statement ok +drop table t1; + +statement ok +drop table t2; \ No newline at end of file diff --git a/datafusion/physical-expr/src/intervals/mod.rs b/datafusion/physical-expr/src/intervals/mod.rs index a9255752fea44..ade5035e2d2b6 100644 --- a/datafusion/physical-expr/src/intervals/mod.rs +++ b/datafusion/physical-expr/src/intervals/mod.rs @@ -19,6 +19,7 @@ pub mod cp_solver; pub mod interval_aritmetic; +pub mod prunability; pub mod rounding; pub mod test_utils; diff --git a/datafusion/physical-expr/src/intervals/prunability.rs b/datafusion/physical-expr/src/intervals/prunability.rs new file mode 100644 index 0000000000000..21c7f7f18a871 --- /dev/null +++ b/datafusion/physical-expr/src/intervals/prunability.rs @@ -0,0 +1,892 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Display, Formatter}; +use std::sync::Arc; + +use petgraph::graph::NodeIndex; +use petgraph::stable_graph::StableGraph; +use petgraph::visit::DfsPostOrder; +use petgraph::Outgoing; + +use crate::expressions::{BinaryExpr, Column}; +use crate::utils::{build_dag, ExprTreeNode}; +use crate::{PhysicalExpr, PhysicalSortExpr}; + +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::Operator; + +/// This object implements a directed acyclic expression graph (DAEG) that +/// is used to determine whether a [`PhysicalExpr`] is prunable given some +/// [`PhysicalSortExpr`]. If so, the prunable table side can be inferred. +/// To decide prunability, we consider monotonicity properties of expressions +/// as well as their orderings and their null placement properties. +#[derive(Clone)] +pub struct ExprPrunabilityGraph { + graph: StableGraph, + root: NodeIndex, +} + +/// This object acts as a propagator of monotonicity information as we traverse +/// the DAEG bottom-up. +#[derive(PartialEq, Debug, Clone, Copy)] +pub enum Monotonicity { + Asc, + Desc, + Unordered, + Singleton, +} + +/// This object acts as a propagator of "prunable table side" information +/// as we traverse the DAEG bottom-up. Column leaves are assigned a `TableSide`, +/// and we carry this information upwards according to expression properties. +#[derive(PartialEq, Debug, Clone, Copy)] +pub enum TableSide { + None, + Left, + Right, + Both, +} + +/// This object encapsulates monotonicity properties together with NULL +/// placement and acts as a joint propagator during bottom-up traversals +/// of the DAEG. +#[derive(PartialEq, Debug, Clone, Copy)] +pub struct SortInfo { + dir: Monotonicity, + nulls_first: bool, + nulls_last: bool, +} + +fn get_tableside_at_numeric( + left_dir: &Monotonicity, + right_dir: &Monotonicity, + left_tableside: &TableSide, + right_tableside: &TableSide, +) -> TableSide { + match (left_dir, right_dir) { + (Monotonicity::Singleton, Monotonicity::Singleton) => TableSide::None, + (Monotonicity::Singleton, _) => *right_tableside, + (_, Monotonicity::Singleton) => *left_tableside, + (_, _) => { + if left_tableside == right_tableside { + *left_tableside + } else { + TableSide::None + } + } + } +} +fn get_tableside_at_gt_or_gteq( + left_dir: &Monotonicity, + right_dir: &Monotonicity, + left_tableside: &TableSide, + right_tableside: &TableSide, +) -> TableSide { + match (left_dir, right_dir) { + (Monotonicity::Asc, Monotonicity::Asc) => *left_tableside, + (Monotonicity::Desc, Monotonicity::Desc) => *right_tableside, + (_, _) => TableSide::None, + } +} +fn get_tableside_at_and( + left_tableside: &TableSide, + right_tableside: &TableSide, +) -> TableSide { + match (left_tableside, right_tableside) { + (TableSide::Left, TableSide::Right) | (TableSide::Right, TableSide::Left) => { + TableSide::Both + } + (TableSide::Left, _) | (_, TableSide::Left) => TableSide::Left, + (TableSide::Right, _) | (_, TableSide::Right) => TableSide::Right, + (_, _) => TableSide::None, + } +} + +impl Monotonicity { + fn add(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Monotonicity::Singleton, Monotonicity::Singleton) => Monotonicity::Singleton, + (Monotonicity::Singleton, rhs) => *rhs, + (lhs, Monotonicity::Singleton) => *lhs, + (Monotonicity::Asc, Monotonicity::Asc) => Monotonicity::Asc, + (Monotonicity::Desc, Monotonicity::Desc) => Monotonicity::Desc, + (_, _) => Monotonicity::Unordered, + } + } + + fn sub(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Monotonicity::Singleton, Monotonicity::Singleton) => Monotonicity::Singleton, + (Monotonicity::Singleton, rhs) => { + if *rhs == Monotonicity::Asc { + Monotonicity::Desc + } else if *rhs == Monotonicity::Desc { + Monotonicity::Asc + } else { + Monotonicity::Unordered + } + } + (lhs, Monotonicity::Singleton) => *lhs, + (Monotonicity::Asc, Monotonicity::Desc) => Monotonicity::Asc, + (Monotonicity::Desc, Monotonicity::Asc) => Monotonicity::Desc, + (_, _) => Monotonicity::Unordered, + } + } + + fn gt_or_gteq(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Monotonicity::Singleton, rhs) => { + if *rhs == Monotonicity::Asc { + Monotonicity::Desc + } else if *rhs == Monotonicity::Desc { + Monotonicity::Asc + } else { + Monotonicity::Unordered + } + } + (lhs, Monotonicity::Singleton) => *lhs, + (Monotonicity::Asc, Monotonicity::Desc) => Monotonicity::Asc, + (Monotonicity::Desc, Monotonicity::Asc) => Monotonicity::Desc, + (_, _) => Monotonicity::Unordered, + } + } + + fn and(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Monotonicity::Asc, Monotonicity::Asc) + | (Monotonicity::Asc, Monotonicity::Singleton) + | (Monotonicity::Singleton, Monotonicity::Asc) => Monotonicity::Asc, + (Monotonicity::Desc, Monotonicity::Desc) + | (Monotonicity::Desc, Monotonicity::Singleton) + | (Monotonicity::Singleton, Monotonicity::Desc) => Monotonicity::Desc, + (Monotonicity::Singleton, Monotonicity::Singleton) => Monotonicity::Singleton, + (_, _) => Monotonicity::Unordered, + } + } +} + +macro_rules! sortinfo_op { + ($OP_NAME:ident) => { + fn $OP_NAME(&self, rhs: &Self) -> Self { + SortInfo { + dir: self.dir.$OP_NAME(&rhs.dir), + nulls_first: self.nulls_first || rhs.nulls_first, + nulls_last: self.nulls_last || rhs.nulls_last, + } + } + }; +} + +impl SortInfo { + sortinfo_op!(add); + sortinfo_op!(sub); + sortinfo_op!(gt_or_gteq); + sortinfo_op!(and); +} + +/// This object represents a node in the DAEG we use for monotonicity analysis. +#[derive(Clone, Debug)] +pub struct ExprPrunabilityGraphNode { + expr: Arc, + table_side: TableSide, + sort_info: SortInfo, +} + +impl Display for ExprPrunabilityGraphNode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Expr:{}, TableSide:{:?}, SortInfo:{:?}", + self.expr, self.table_side, self.sort_info + ) + } +} + +impl ExprPrunabilityGraphNode { + /// Constructs a new DAEG node with an unordered [`SortInfo`]. + pub fn new(expr: Arc) -> Self { + ExprPrunabilityGraphNode { + expr, + table_side: TableSide::None, + sort_info: SortInfo { + dir: Monotonicity::Unordered, + nulls_first: false, + nulls_last: false, + }, + } + } + + /// This function creates a DAEG node from Datafusion's [`ExprTreeNode`] object. + pub fn make_node(node: &ExprTreeNode) -> ExprPrunabilityGraphNode { + ExprPrunabilityGraphNode::new(node.expression().clone()) + } +} + +impl PartialEq for ExprPrunabilityGraphNode { + fn eq(&self, other: &ExprPrunabilityGraphNode) -> bool { + self.expr.eq(&other.expr) + && self.table_side.eq(&other.table_side) + && self.sort_info.eq(&other.sort_info) + } +} + +impl ExprPrunabilityGraph { + pub fn try_new(expr: Arc) -> Result { + // Build the full graph: + let (root, graph) = build_dag(expr, &ExprPrunabilityGraphNode::make_node)?; + Ok(Self { graph, root }) + } + + /// This function traverses the whole graph. It returns prunable table sides + /// and sort information of the binary expression that generates the graph. + pub fn analyze_prunability( + &mut self, + left_sort_expr: &Option, + right_sort_expr: &Option, + ) -> Result<(TableSide, SortInfo)> { + let mut includes_filter = false; + // Since children nodes determine their parent's properties, we use DFS: + let mut dfs = DfsPostOrder::new(&self.graph, self.root); + while let Some(node) = dfs.next(&self.graph) { + let children = self.graph.neighbors_directed(node, Outgoing); + let mut children_prunability = children + .map(|child| { + (&self.graph[child].table_side, &self.graph[child].sort_info) + }) + .collect::>(); + + // Handle leaf nodes: + if children_prunability.is_empty() { + // Initialize as an unordered column with an undefined table side: + self.graph[node].table_side = TableSide::None; + self.graph[node].sort_info.dir = Monotonicity::Unordered; + // By convention, we initially set nulls_first and nulls_last to false. + self.graph[node].sort_info.nulls_first = false; + self.graph[node].sort_info.nulls_last = false; + + self.update_node_with_sort_information( + left_sort_expr, + right_sort_expr, + node, + ); + } + // Handle intermediate nodes: + else { + // The graph library gives children in the reverse order, so we + // "fix" the order by the following reversal: + children_prunability.reverse(); + let binary_expr = self.graph[node].expr.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal("PhysicalExpr under investigation for prunability should be BinaryExpr.".to_string()) + })?; + (self.graph[node].table_side, self.graph[node].sort_info) = + if binary_expr.op().is_numerical_operators() { + numeric_node_order( + &children_prunability, + binary_expr, + left_sort_expr, + right_sort_expr, + )? + } else if binary_expr.op().is_comparison_operator() { + includes_filter = true; + comparison_node_order(&children_prunability, binary_expr.op())? + } else if *binary_expr.op() == Operator::And { + logical_node_order(&children_prunability) + } else { + return Err(DataFusionError::NotImplemented(format!( + "Prunability is not supported yet for the logical operator {}", + *binary_expr.op() + ))); + }; + } + } + Ok(if includes_filter { + ( + self.graph[self.root].table_side, + self.graph[self.root].sort_info, + ) + } else { + (TableSide::None, self.graph[self.root].sort_info) + }) + } + + /// This function takes the index of a leaf node, which may contain a column + /// or a literal. If it does contain a column, it scans the `PhysicalSortExpr`s + /// and checks if there is a match with that column. If so, `TableSide` and + /// `SortInfo` of the node is set accordingly. + fn update_node_with_sort_information( + &mut self, + left_sort_expr: &Option, + right_sort_expr: &Option, + node: NodeIndex, + ) { + let current = &mut self.graph[node]; + if let Some(column) = current.expr.as_any().downcast_ref::() { + for (table_side, sort_expr) in [ + (TableSide::Left, left_sort_expr), + (TableSide::Right, right_sort_expr), + ] + .into_iter() + { + // We handle non-column sort expressions in intermediate nodes. + // Thus, we need only to check column sort expressions: + if let Some(sort_expr) = sort_expr { + if let Some(sorted) = sort_expr.expr.as_any().downcast_ref::() + { + if column == sorted { + current.table_side = table_side; + current.sort_info.dir = if sort_expr.options.descending { + Monotonicity::Desc + } else { + Monotonicity::Asc + }; + let nulls_first = sort_expr.options.nulls_first; + current.sort_info.nulls_first = nulls_first; + current.sort_info.nulls_last = !nulls_first; + break; + } + } + } + } + } else { + // If the leaf node is not a column, it must be a literal. Its + // table side is already initialized as None, no need to change it. + current.sort_info.dir = Monotonicity::Singleton; + current.sort_info.nulls_first = false; + current.sort_info.nulls_last = false; + } + } +} + +fn numeric_node_order( + children: &[(&TableSide, &SortInfo)], + binary_expr: &BinaryExpr, + left_sort_expr: &Option, + right_sort_expr: &Option, +) -> Result<(TableSide, SortInfo)> { + // There may be SortOptions for BinaryExpr's like a + b. We handle such + // situations here: + for (table_side, sort_expr) in [ + (TableSide::Left, left_sort_expr), + (TableSide::Right, right_sort_expr), + ] + .into_iter() + { + if let Some(sort_expr) = sort_expr { + if let Some(binary_expr_sorted) = + sort_expr.expr.as_any().downcast_ref::() + { + if binary_expr.eq(binary_expr_sorted) { + let dir = if sort_expr.options.descending { + Monotonicity::Desc + } else { + Monotonicity::Asc + }; + let nulls_first = sort_expr.options.nulls_first; + return Ok(( + table_side, + SortInfo { + dir, + nulls_first, + nulls_last: !nulls_first, + }, + )); + } + } + } + } + let ((left_ts, left_si), (right_ts, right_si)) = (children[0], children[1]); + let from = get_tableside_at_numeric(&left_si.dir, &right_si.dir, left_ts, right_ts); + match binary_expr.op() { + Operator::Plus => Ok((from, left_si.add(right_si))), + Operator::Minus => Ok((from, left_si.sub(right_si))), + op => Err(DataFusionError::NotImplemented(format!( + "Prunability is not supported yet for binary expressions having the {op} operator" + ))) + } +} + +fn comparison_node_order( + children: &[(&TableSide, &SortInfo)], + binary_expr_op: &Operator, +) -> Result<(TableSide, SortInfo)> { + let ((left_ts, left_si), (right_ts, right_si)) = (children[0], children[1]); + match binary_expr_op { + Operator::Gt | Operator::GtEq => Ok(( + get_tableside_at_gt_or_gteq(&left_si.dir, &right_si.dir, left_ts, right_ts), + left_si.gt_or_gteq(right_si), + )), + Operator::Lt | Operator::LtEq => Ok(( + get_tableside_at_gt_or_gteq(&right_si.dir, &left_si.dir, right_ts, left_ts), + right_si.gt_or_gteq(left_si), + )), + op => Err(DataFusionError::NotImplemented(format!( + "Prunability is not supported yet for binary expressions having the {op} operator" + ), + )) + } +} + +fn logical_node_order(children: &[(&TableSide, &SortInfo)]) -> (TableSide, SortInfo) { + let ((left_ts, left_si), (right_ts, right_si)) = (children[0], children[1]); + ( + get_tableside_at_and(left_ts, right_ts), + left_si.and(right_si), + ) +} + +#[cfg(test)] +mod tests { + use std::ops::Not; + + use super::*; + use crate::expressions::{col, BinaryExpr, Literal}; + use arrow_schema::{DataType, Field, Schema, SortOptions}; + use datafusion_common::ScalarValue; + + // This experiment expects its input expression to be in the form: + // ( (left_column + a) > (right_column + b) ) AND ( (left_column + c) < (right_column + d) ) + fn experiment_prunability( + schema_left: &Schema, + schema_right: &Schema, + expr: Arc, + ) -> Result<()> { + let left_sorted_asc = PhysicalSortExpr { + expr: col("left_column", schema_left)?, + options: SortOptions::default(), + }; + let right_sorted_asc = PhysicalSortExpr { + expr: col("right_column", schema_right)?, + options: SortOptions::default(), + }; + let left_sorted_desc = PhysicalSortExpr { + expr: col("left_column", schema_left)?, + options: SortOptions::default().not(), + }; + let right_sorted_desc = PhysicalSortExpr { + expr: col("right_column", schema_right)?, + options: SortOptions::default().not(), + }; + + let mut graph = ExprPrunabilityGraph::try_new(expr)?; + + assert_eq!( + ( + TableSide::Both, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: false + } + ), + graph.analyze_prunability( + &Some(left_sorted_asc.clone()), + &Some(right_sorted_asc.clone()), + )? + ); + assert_eq!( + ( + TableSide::None, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: true + } + ), + graph.analyze_prunability( + &Some(left_sorted_asc.clone()), + &Some(right_sorted_desc.clone()), + )? + ); + assert_eq!( + ( + TableSide::None, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: true + } + ), + graph.analyze_prunability( + &Some(left_sorted_desc.clone()), + &Some(right_sorted_asc), + )? + ); + assert_eq!( + ( + TableSide::Both, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: false, + nulls_last: true + } + ), + graph + .analyze_prunability(&Some(left_sorted_desc), &Some(right_sorted_desc),)? + ); + assert_eq!( + ( + TableSide::None, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: false + } + ), + graph.analyze_prunability(&Some(left_sorted_asc), &None,)? + ); + + Ok(()) + } + + // This experiment expects its input expression to be in the form: + // ( (left_column1 + a) > (b - right_column1) ) AND ( (left_column2 + c) < (right_column2 - d) ) + fn experiment_prunability_four_columns( + schema_left: &Schema, + schema_right: &Schema, + expr: Arc, + ) -> Result<()> { + let left_sorted1_asc = PhysicalSortExpr { + expr: col("left_column1", schema_left)?, + options: SortOptions::default(), + }; + let right_sorted1_asc = PhysicalSortExpr { + expr: col("right_column1", schema_right)?, + options: SortOptions::default(), + }; + let left_sorted1_desc = PhysicalSortExpr { + expr: col("left_column1", schema_left)?, + options: SortOptions::default().not(), + }; + let right_sorted1_desc = PhysicalSortExpr { + expr: col("right_column1", schema_right)?, + options: SortOptions::default().not(), + }; + let left_sorted2_asc = PhysicalSortExpr { + expr: col("left_column2", schema_left)?, + options: SortOptions::default(), + }; + let right_sorted2_asc = PhysicalSortExpr { + expr: col("right_column2", schema_right)?, + options: SortOptions::default(), + }; + let left_sorted2_desc = PhysicalSortExpr { + expr: col("left_column2", schema_left)?, + options: SortOptions::default().not(), + }; + let right_sorted2_desc = PhysicalSortExpr { + expr: col("right_column2", schema_right)?, + options: SortOptions::default().not(), + }; + + let mut graph = ExprPrunabilityGraph::try_new(expr)?; + + assert_eq!( + ( + TableSide::Left, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: true + } + ), + graph.analyze_prunability( + &Some(left_sorted1_asc), + &Some(right_sorted1_desc), + )? + ); + assert_eq!( + ( + TableSide::None, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: true + } + ), + graph.analyze_prunability( + &Some(left_sorted2_desc.clone()), + &Some(right_sorted2_asc.clone()), + )? + ); + assert_eq!( + ( + TableSide::Left, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: false, + nulls_last: true + } + ), + graph.analyze_prunability( + &Some(left_sorted2_desc), + &Some(right_sorted2_desc), + )? + ); + assert_eq!( + ( + TableSide::Right, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: false + } + ), + graph + .analyze_prunability(&Some(left_sorted2_asc), &Some(right_sorted2_asc),)? + ); + assert_eq!( + ( + TableSide::Right, + SortInfo { + dir: Monotonicity::Unordered, + nulls_first: true, + nulls_last: true + } + ), + graph.analyze_prunability( + &Some(left_sorted1_desc), + &Some(right_sorted1_asc), + )? + ); + + Ok(()) + } + + // This experiment expects its input expression to be in the form: + // ( (left_column + a) >= (right_column + b) ) AND ( (right_column + c) <= (left_column - d) ) + fn experiment_sort_info( + schema_left: &Schema, + schema_right: &Schema, + expr: Arc, + ) -> Result<()> { + let left_sorted_asc = PhysicalSortExpr { + expr: col("left_column", schema_left)?, + options: SortOptions::default(), + }; + let right_sorted_asc = PhysicalSortExpr { + expr: col("right_column", schema_right)?, + options: SortOptions::default(), + }; + let left_sorted_desc = PhysicalSortExpr { + expr: col("left_column", schema_left)?, + options: SortOptions::default().not(), + }; + let right_sorted_desc = PhysicalSortExpr { + expr: col("right_column", schema_right)?, + options: SortOptions::default().not(), + }; + + let mut graph = ExprPrunabilityGraph::try_new(expr)?; + + assert_eq!( + ( + TableSide::None, + SortInfo { + dir: Monotonicity::Asc, + nulls_first: true, + nulls_last: true + } + ), + graph + .analyze_prunability(&Some(left_sorted_asc), &Some(right_sorted_desc),)? + ); + + assert_eq!( + ( + TableSide::None, + SortInfo { + dir: Monotonicity::Desc, + nulls_first: true, + nulls_last: true + } + ), + graph + .analyze_prunability(&Some(left_sorted_desc), &Some(right_sorted_asc),)? + ); + + Ok(()) + } + + #[test] + fn test_prunability_symmetric_graph() -> Result<()> { + // Create 2 schemas having an interger column + let schema_left = + Schema::new(vec![Field::new("left_column", DataType::Int32, true)]); + let schema_right = + Schema::new(vec![Field::new("right_column", DataType::Int32, true)]); + + // ( (left_column + 1) > (right_column + 2) ) AND ( (left_column + 3) < (right_column + 4) ) + let left_col = col("left_column", &schema_left)?; + let right_col = col("right_column", &schema_right)?; + let left_and_1 = Arc::new(BinaryExpr::new( + left_col.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let left_and_2 = Arc::new(BinaryExpr::new( + right_col.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + )); + let right_and_1 = Arc::new(BinaryExpr::new( + left_col, + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), + )); + let right_and_2 = Arc::new(BinaryExpr::new( + right_col, + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(4)))), + )); + let left_expr = Arc::new(BinaryExpr::new(left_and_1, Operator::Gt, left_and_2)); + let right_expr = + Arc::new(BinaryExpr::new(right_and_1, Operator::Lt, right_and_2)); + let expr = Arc::new(BinaryExpr::new(left_expr, Operator::And, right_expr)); + + experiment_prunability(&schema_left, &schema_right, expr)?; + + Ok(()) + } + + #[test] + fn test_prunability_asymmetric_graph() -> Result<()> { + // Create 2 schemas having an interger column + let schema_left = + Schema::new(vec![Field::new("left_column", DataType::Int32, true)]); + let schema_right = + Schema::new(vec![Field::new("right_column", DataType::Int32, true)]); + + // ( ((left_column + 1) + 3) >= ((right_column + 2) + 4) ) AND ( (left_column) <= (right_column) ) + let left_col = col("left_column", &schema_left)?; + let right_col = col("right_column", &schema_right)?; + let left_and_1_inner = Arc::new(BinaryExpr::new( + left_col.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let left_and_1 = Arc::new(BinaryExpr::new( + left_and_1_inner, + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), + )); + let left_and_2_inner = Arc::new(BinaryExpr::new( + right_col.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + )); + let left_and_2 = Arc::new(BinaryExpr::new( + left_and_2_inner, + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(4)))), + )); + let left_expr = Arc::new(BinaryExpr::new(left_and_1, Operator::GtEq, left_and_2)); + let right_expr = Arc::new(BinaryExpr::new(left_col, Operator::LtEq, right_col)); + let expr = Arc::new(BinaryExpr::new(left_expr, Operator::And, right_expr)); + + experiment_prunability(&schema_left, &schema_right, expr)?; + + Ok(()) + } + + #[test] + fn test_prunability_more_columns() -> Result<()> { + // Create 2 schemas having two interger columns + let schema_left = Schema::new(vec![ + Field::new("left_column1", DataType::Int32, true), + Field::new("left_column2", DataType::Int32, true), + ]); + let schema_right = Schema::new(vec![ + Field::new("right_column1", DataType::Int32, true), + Field::new("right_column2", DataType::Int32, true), + ]); + + // ( (left_column1 + 1) > (2 - right_column1) ) AND ( (left_column2 + 3) < (right_column2 - 4) ) + let left_col1 = col("left_column1", &schema_left)?; + let right_col1 = col("right_column1", &schema_right)?; + let left_col2 = col("left_column2", &schema_left)?; + let right_col2 = col("right_column2", &schema_right)?; + let left_and_1 = Arc::new(BinaryExpr::new( + left_col1.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let left_and_2 = Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + Operator::Minus, + right_col1.clone(), + )); + let right_and_1 = Arc::new(BinaryExpr::new( + left_col2.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), + )); + let right_and_2 = Arc::new(BinaryExpr::new( + right_col2.clone(), + Operator::Minus, + Arc::new(Literal::new(ScalarValue::Int32(Some(4)))), + )); + let left_expr = Arc::new(BinaryExpr::new(left_and_1, Operator::Gt, left_and_2)); + let right_expr = + Arc::new(BinaryExpr::new(right_and_1, Operator::Lt, right_and_2)); + let expr = Arc::new(BinaryExpr::new(left_expr, Operator::And, right_expr)); + + experiment_prunability_four_columns(&schema_left, &schema_right, expr)?; + + Ok(()) + } + + #[test] + fn test_sort_info() -> Result<()> { + // Create 2 schemas having two integer columns + let schema_left = + Schema::new(vec![Field::new("left_column", DataType::Int32, true)]); + let schema_right = + Schema::new(vec![Field::new("right_column", DataType::Int32, true)]); + + // ( (left_column + 1) >= (right_column + 2) ) AND ( (right_column + 3) <= (left_column - 4) ) + let left_col = col("left_column", &schema_left)?; + let right_col = col("right_column", &schema_right)?; + let expr1 = Arc::new(BinaryExpr::new( + left_col.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let expr2 = Arc::new(BinaryExpr::new( + right_col.clone(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + )); + let expr3 = Arc::new(BinaryExpr::new( + right_col, + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), + )); + let expr4 = Arc::new(BinaryExpr::new( + left_col, + Operator::Minus, + Arc::new(Literal::new(ScalarValue::Int32(Some(4)))), + )); + let left_expr = Arc::new(BinaryExpr::new(expr1, Operator::GtEq, expr2)); + let right_expr = Arc::new(BinaryExpr::new(expr3, Operator::LtEq, expr4)); + let expr = Arc::new(BinaryExpr::new(left_expr, Operator::And, right_expr)); + + experiment_sort_info(&schema_left, &schema_right, expr)?; + + Ok(()) + } +}