Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC Rewrite bloom filter pruning predicate logic in terms of intervals #14

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 142 additions & 72 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use arrow::{
array::ArrayRef,
datatypes::{DataType, Schema},
};
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use arrow_schema::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
Expand All @@ -31,6 +32,9 @@ use std::{
sync::Arc,
};

use datafusion_physical_expr::expressions::lit;
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};

use crate::datasource::{
listing::FileRange,
physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type},
Expand Down Expand Up @@ -117,8 +121,10 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr())
{
let bf_predicates = match BloomFilterPruningPredicate::try_new(
predicate.schema().clone(),
predicate.orig_expr(),
) {
Ok(predicates) => predicates,
Err(_) => {
return row_groups.to_vec();
Expand Down Expand Up @@ -168,93 +174,147 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
}

struct BloomFilterPruningPredicate {
/// Actual pruning predicate (rewritten in terms of column min/max statistics)
predicate_expr: Option<phys_expr::BinaryExpr>,
/// The input schema against which the predicate will be evaluated
schema: SchemaRef,

/// predicate
predicate_expr: Arc<dyn PhysicalExpr>,
/// The statistics required to evaluate this predicate
required_columns: Vec<String>,
}

impl BloomFilterPruningPredicate {
fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
let binary_expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
match binary_expr {
Some(binary_expr) => {
let columns = Self::get_predicate_columns(expr);
Ok(Self {
predicate_expr: Some(binary_expr.clone()),
required_columns: columns.into_iter().collect(),
})
}
None => Err(DataFusionError::Execution(
fn try_new(schema: SchemaRef, expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
let columns = Self::get_predicate_columns(expr);
if columns.is_empty() {
return Err(DataFusionError::Execution(
"BloomFilterPruningPredicate only support binary expr".to_string(),
)),
));
}
}

fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf)
Ok(Self {
schema,
predicate_expr: expr.clone(),
required_columns: columns.into_iter().collect(),
})
}

/// filter the expr with bloom filter return true if the expr can be pruned
fn prune_expr_with_bloom_filter(
expr: Option<&phys_expr::BinaryExpr>,
column_sbbf: &HashMap<String, Sbbf>,
) -> bool {
if expr.is_none() {
fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
// rewrite any <col = literal> exprs to to `false` if we can provve they are using the bloom filter.
alamb marked this conversation as resolved.
Show resolved Hide resolved
let rewritten = self
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased with this logic. What it does is to rewrite the predicate String@0 = Hello_Not_Exists AND 1 = 1 is rewritten to false AND false and the interval analysis determines that expression is always false:

Evaluating. self.predicate_expr: String@0 = Hello_Not_Exists AND 1 = 1
   rewritten false AND 1 = 1
   interval result: [false, false]
lower: IntervalBound { value: Boolean(false), open: false }
upper: IntervalBound { value: Boolean(false), open: false }

However it seems I need to update the interval analysis so that OR is handled as OR gives a "unknown" for the bounds:

Evaluating. self.predicate_expr: String@0 = Hello_Not_Exists OR String@0 = Hello_Not_Exists2
   rewritten false OR false
   interval result: (NULL, NULL)
lower: IntervalBound { value: NULL, open: true }
upper: IntervalBound { value: NULL, open: true }

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added support for OR here: apache#7884

.predicate_expr
.clone()
.transform_up(&|expr| {
let Some(binary_expr) =
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
else {
return Ok(Transformed::No(expr));
};

// is this a <col> = <constant> expression?
let Some((col, val)) = Self::check_expr_is_col_equal_const(binary_expr)
else {
return Ok(Transformed::No(expr));
};

// do we have a bloom filter for this column?
let Some(sbbf) = column_sbbf.get(col.name()) else {
return Ok(Transformed::No(expr));
};

// Can we tell this value is not present by checking the bloom filter?
let filtered = match val {
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
_ => false,
};

if filtered {
// rewrote col = <value> --> false
Ok(Transformed::Yes(lit(false)))
} else {
// could not rule this out based on the bloom filter
Ok(Transformed::No(expr))
}
})
// error is never returned in closure
.unwrap();

// Run boundary analysis on this predicate, to see if we can prove it is false
println!("Evaluating. self.predicate_expr: {}", self.predicate_expr);
println!(" rewritten {rewritten}");
let Ok(mut graph) = ExprIntervalGraph::try_new(rewritten.clone()) else {
return false;
}
let expr = expr.unwrap();
match expr.op() {
Operator::And => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
left || right
}
Operator::Or => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
left && right
}
Operator::Eq => {
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) {
if let Some(sbbf) = column_sbbf.get(col.name()) {
match val {
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
_ => false,
};

// with no aprior knowledge can we determine the predicate is always false?
let Ok(interval) = graph.evaluate_bounds() else {
return false;
};
println!(" interval result: {interval}");

is_always_false_interval(interval)
}

/*
match expr.op() {
Operator::And => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
left || right
}
Operator::Or => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
left && right
}
Operator::Eq => {
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) {
if let Some(sbbf) = column_sbbf.get(col.name()) {
match val {
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
_ => false,
}
} else {
false
}
} else {
false
}
} else {
false
}
_ => false,
}
_ => false,
}
}
*/

fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
let mut columns = HashSet::new();
Expand Down Expand Up @@ -301,6 +361,16 @@ impl BloomFilterPruningPredicate {
}
}

/// Returns true if the interval is always false
fn is_always_false_interval(interval: &Interval) -> bool {
println!("lower: {:?}", interval.lower);
println!("upper: {:?}", interval.upper);
!interval.lower.open
&& matches!(interval.lower.value, ScalarValue::Boolean(Some(false)))
&& !interval.upper.open
&& matches!(interval.upper.value, ScalarValue::Boolean(Some(false)))
}

/// Wraps parquet statistics in a way
/// that implements [`PruningStatistics`]
struct RowGroupPruningStatistics<'a> {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,13 @@ impl PhysicalExpr for BinaryExpr {
// upon adding support for additional logical operators, this
// method will require modification to support propagating the
// changes accordingly.
return Ok(vec![]);
if matches!(self.op, Operator::And) {
return Ok(vec![]);
} else if matches!(self.op, Operator::Or) {
todo!();
} else {
unreachable!()
}
} else if self.op.is_comparison_operator() {
if interval == &Interval::CERTAINLY_FALSE {
// TODO: We will handle strictly false clauses by negating
Expand Down
Loading