Skip to content

Commit

Permalink
Feature/determine prunability (#139)
Browse files Browse the repository at this point in the history
* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
3 people authored and metesynnada committed Jul 21, 2023
1 parent 8668da6 commit cf56105
Show file tree
Hide file tree
Showing 4 changed files with 1,051 additions and 3 deletions.
108 changes: 105 additions & 3 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::intervals::prunability::{ExprPrunabilityGraph, TableSide};
use datafusion_physical_expr::intervals::{check_support, is_datatype_supported};
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

/// The PipelineChecker rule rejects non-runnable query plans that use
Expand Down Expand Up @@ -138,7 +142,8 @@ pub fn check_finiteness_requirements(
) -> Result<Transformed<PipelineStatePropagator>> {
if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
|| (exec.check_if_order_information_available()?
&& is_prunable(exec, &input.children_unbounded)))
{
const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag";
Expand All @@ -161,17 +166,114 @@ pub fn check_finiteness_requirements(
///
/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
fn is_prunable(join: &SymmetricHashJoinExec, children_unbounded: &[bool]) -> bool {
if join.filter().map_or(false, |filter| {
check_support(filter.expression())
&& filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()))
}) {
if let Some(filter) = join.filter() {
if let Ok(mut graph) =
ExprPrunabilityGraph::try_new((*filter.expression()).clone())
{
let left_sort_expr = join.left().output_ordering().map(|s| s[0].clone());
let right_sort_expr =
join.right().output_ordering().map(|s| s[0].clone());
let new_left_sort = get_sort_expr_in_filter_schema(
&left_sort_expr,
filter,
JoinSide::Left,
);
let new_right_sort = get_sort_expr_in_filter_schema(
&right_sort_expr,
filter,
JoinSide::Right,
);
if let Ok((table_side, _)) =
graph.analyze_prunability(&new_left_sort, &new_right_sort)
{
return prunability_for_unbounded_tables(
children_unbounded[0],
children_unbounded[1],
&table_side,
);
}
}
}
}
false
}

// Updates index of the column with the new index (if PhysicalExpr is Column)
fn update_column_index(
sort_expr: &Option<PhysicalSortExpr>,
updated_idx: usize,
) -> Option<PhysicalSortExpr> {
sort_expr.as_ref().and_then(|sort_expr| {
sort_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|column| {
let sort_name = column.name();
let options = sort_expr.options;
let expr = Arc::new(Column::new(sort_name, updated_idx));
PhysicalSortExpr { expr, options }
})
})
}

fn get_sort_expr_in_filter_schema(
sort_expr: &Option<PhysicalSortExpr>,
filter: &JoinFilter,
side: JoinSide,
) -> Option<PhysicalSortExpr> {
let sorted_column_index_in_filter = find_index_in_filter(filter, sort_expr, side);
sorted_column_index_in_filter.and_then(|idx| update_column_index(sort_expr, idx))
}

fn find_index_in_filter(
join_filter: &JoinFilter,
left_sort_expr: &Option<PhysicalSortExpr>,
join_side: JoinSide,
) -> Option<usize> {
for (i, (field, column_index)) in join_filter
.schema()
.fields()
.iter()
.zip(join_filter.column_indices())
.enumerate()
{
if let Some(physical_sort) = left_sort_expr {
if let Some(column) = physical_sort.expr.as_any().downcast_ref::<Column>() {
if column.name() == field.name() && column_index.side == join_side {
return Some(i);
}
}
}
}
None
}

fn prunability_for_unbounded_tables(
left_unbounded: bool,
right_unbounded: bool,
table_side: &TableSide,
) -> bool {
let (left_prunable, right_prunable) = match table_side {
TableSide::Left => (true, false),
TableSide::Right => (false, true),
TableSide::Both => (true, true),
TableSide::None => (false, false),
};
// If both sides are either bounded or prunable, return true (Can do calculations with bounded memory)
// Otherwise return false (Cannot do calculations with bounded memory)
(!left_unbounded || left_prunable) && (!right_unbounded || right_prunable)
}

#[cfg(test)]
mod sql_tests {
use super::*;
Expand Down
53 changes: 53 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,56 @@ drop table IF EXISTS full_join_test;
# batch size
statement ok
set datafusion.execution.batch_size = 8192;

# test joins give error with unbounded tables by the analysis of prunability
statement ok
CREATE unbounded external table t1(a1 integer, a2 integer, a3 integer)
STORED as CSV
WITH HEADER ROW
WITH ORDER (a2 ASC, a3 ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';

statement ok
CREATE unbounded external table t2(a1 integer, a2 integer, a3 integer)
STORED as CSV
WITH HEADER ROW
WITH ORDER (a2 ASC, a3 ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';

statement ok
set datafusion.optimizer.allow_symmetric_joins_without_pruning = false

# query with a filter causing table to be not prunable
query error DataFusion error: PipelineChecker
SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3
FROM t1
JOIN t2
ON t1.a1 = t2.a1
WHERE t1.a3 < t2.a3 AND t1.a3 >= t2.a3

# query with a filter causing table to be prunable
statement ok
SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3
FROM t1
JOIN t2
ON t1.a1 = t2.a1
WHERE t2.a2 >= t1.a2 AND t1.a2 > t2.a2

statement ok
set datafusion.optimizer.allow_symmetric_joins_without_pruning = true

# query with a filter causing table to be not prunable, but the join is allowed
statement ok
SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3
FROM t1
JOIN t2
ON t1.a1 = t2.a1
WHERE t1.a2 < t2.a2 AND t1.a2 < t2.a2

statement ok
drop table t1;

statement ok
drop table t2;
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/intervals/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
pub mod cp_solver;
pub mod interval_aritmetic;
pub mod prunability;
pub mod rounding;

pub mod test_utils;
Expand Down
Loading

0 comments on commit cf56105

Please sign in to comment.