Skip to content

Commit

Permalink
Prunability of Join Filter Physical Expressions (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada authored and mustafasrepo committed Dec 21, 2023
1 parent cb14153 commit 841878c
Show file tree
Hide file tree
Showing 5 changed files with 2,310 additions and 89 deletions.
37 changes: 25 additions & 12 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::is_filter_expr_prunable;
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

Expand Down Expand Up @@ -140,7 +141,8 @@ pub fn check_finiteness_requirements(
) -> Result<Transformed<PipelineStatePropagator>> {
if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
|| (exec.check_if_order_information_available()?
&& is_prunable(exec, &input.children_unbounded)))
{
const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag";
Expand All @@ -156,21 +158,32 @@ pub fn check_finiteness_requirements(
})
}

/// This function returns whether a given symmetric hash join is amenable to
/// data pruning. For this to be possible, it needs to have a filter where
/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
/// interval calculations.
///
/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
/// This function returns whether a given symmetric hash join is amenable to data pruning.
fn is_prunable(join: &SymmetricHashJoinExec, children_unbounded: &[bool]) -> bool {
join.filter().map_or(false, |filter| {
check_support(filter.expression(), &join.schema())
&& filter
if !check_support(filter.expression(), &join.schema())
|| filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()))
.any(|f| !is_datatype_supported(f.data_type()))
{
return false;
}
let prunable_sides = is_filter_expr_prunable(
filter,
// Get the left leading order
join.left.output_ordering().map(|arr| arr[0].clone()),
// Get the right leading order
join.right.output_ordering().map(|arr| arr[0].clone()),
|| join.left.equivalence_properties(),
|| join.left.ordering_equivalence_properties(),
|| join.right.equivalence_properties(),
|| join.right.ordering_equivalence_properties(),
)
.unwrap_or((false, false));
(prunable_sides.1 || !children_unbounded[1])
&& (prunable_sides.0 || !children_unbounded[0])
})
}

Expand Down
48 changes: 48 additions & 0 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,33 @@ use crate::PhysicalExpr;

use arrow::array::*;
use arrow::compute::cast;
use arrow::compute::eq_dyn_bool_scalar;
use arrow::compute::gt_dyn_bool_scalar;
use arrow::compute::gt_eq_dyn_bool_scalar;
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
use arrow::compute::kernels::cmp::*;
use arrow::compute::kernels::comparison::regexp_is_match_utf8;
use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
use arrow::compute::kernels::comparison::{
eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar,
lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_scalar, gt_dyn_scalar, gt_eq_dyn_scalar, lt_dyn_scalar, lt_eq_dyn_scalar,
neq_dyn_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
use arrow::compute::lt_dyn_bool_scalar;
use arrow::compute::lt_eq_dyn_bool_scalar;
use arrow::compute::neq_dyn_bool_scalar;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;

use arrow_array::{Datum, LargeStringArray, Scalar};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
Expand Down Expand Up @@ -133,6 +152,35 @@ macro_rules! compute_utf8_op {
}};
}

/// Invoke a compute kernel on a data array and a scalar value
macro_rules! compute_utf8_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident, $OP_TYPE:expr) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
if let ScalarValue::Utf8(Some(string_value))
| ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT
{
Ok(Arc::new(paste::expr! {[<$OP _utf8_scalar>]}(
&ll,
&string_value,
)?))
} else if $RIGHT.is_null() {
Ok(Arc::new(arrow::array::new_null_array(
$OP_TYPE,
$LEFT.len(),
)))
} else {
internal_err!(
"compute_utf8_op_scalar for '{}' failed to cast literal value {}",
stringify!($OP),
$RIGHT
)
}
}};
}

macro_rules! binary_string_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/physical-expr/src/expressions/like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ use std::{any::Any, sync::Arc};
use crate::{physical_expr::down_cast_any_ref, PhysicalExpr};

use crate::expressions::datum::apply_cmp;
use arrow::record_batch::RecordBatch;
use arrow::compute::kernels::comparison::{
ilike_utf8, like_utf8, nilike_utf8, nlike_utf8,
};
use arrow::compute::kernels::comparison::{
ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar,
};
use arrow::{
array::{Array, ArrayRef, StringArray},
record_batch::RecordBatch,
};
use arrow_array::LargeStringArray;
use arrow_schema::{DataType, Schema};
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ pub use sort_expr::{
LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalSortExpr,
PhysicalSortRequirement,
};
pub use utils::{reverse_order_bys, split_conjunction};
pub use sort_properties::{update_ordering, SortProperties};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction,
};
Loading

0 comments on commit 841878c

Please sign in to comment.