From 66e212ce3590f0a169825ca3381c4688d493cf60 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 6 Dec 2023 10:18:20 -0500 Subject: [PATCH 1/4] Implement `contains` API in PruningPredicate --- .../physical_plan/parquet/page_filter.rs | 11 +- .../physical_plan/parquet/row_groups.rs | 9 + .../core/src/physical_optimizer/pruning.rs | 1072 +++++++++++++---- 3 files changed, 856 insertions(+), 236 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 42bfef35996e..f6310c49bcd6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -23,7 +23,7 @@ use arrow::array::{ }; use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; @@ -37,6 +37,7 @@ use parquet::{ }, format::PageLocation, }; +use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; @@ -554,4 +555,12 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> { ))), } } + + fn contained( + &self, + _column: &datafusion_common::Column, + _values: &HashSet, + ) -> Option { + None + } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 7c3f7d9384ab..09e4907c9437 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -16,6 +16,7 @@ // under the License. use arrow::{array::ArrayRef, datatypes::Schema}; +use arrow_array::BooleanArray; use arrow_schema::FieldRef; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; @@ -340,6 +341,14 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count())); scalar.to_array().ok() } + + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } } #[cfg(test)] diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index b2ba7596db8d..5fabf6a4b501 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -35,12 +35,13 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use datafusion_common::{downcast_value, plan_datafusion_err, ScalarValue}; +use arrow_array::cast::AsArray; use datafusion_common::{ internal_err, plan_err, tree_node::{Transformed, TreeNode}, }; -use datafusion_physical_expr::utils::collect_columns; +use datafusion_common::{plan_datafusion_err, ScalarValue}; +use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use log::trace; @@ -93,6 +94,30 @@ pub trait PruningStatistics { /// /// Note: the returned array must contain [`Self::num_containers`] rows fn null_counts(&self, column: &Column) -> Option; + + /// Returns an array where each row represents information known about + /// the `values` contained in a column. + /// + /// This API is designed to be used along with [`LiteralGuarantee`] to prove + /// that predicates can not possibly evaluate to `true` and thus prune + /// containers. For example, Parquet Bloom Filters can prove that values are + /// not present. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option; } /// Evaluates filter expressions on statistics such as min/max values and null @@ -142,12 +167,17 @@ pub trait PruningStatistics { pub struct PruningPredicate { /// The input schema against which the predicate will be evaluated schema: SchemaRef, - /// Actual pruning predicate (rewritten in terms of column min/max statistics) + /// A min/max pruning predicate (rewritten in terms of column min/max + /// values, which are supplied by statistics) predicate_expr: Arc, - /// The statistics required to evaluate this predicate - required_columns: RequiredStatColumns, - /// Original physical predicate from which this predicate expr is derived (required for serialization) + /// Description of which statistics are required to evaluate `predicate_expr` + required_columns: RequiredColumns, + /// Original physical predicate from which this predicate expr is derived + /// (required for serialization) orig_expr: Arc, + /// [`LiteralGuarantee`]s that are used to try and prove a predicate can not + /// possibly evaluate to `true`. + literal_guarantees: Vec, } impl PruningPredicate { @@ -172,14 +202,18 @@ impl PruningPredicate { /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))` pub fn try_new(expr: Arc, schema: SchemaRef) -> Result { // build predicate expression once - let mut required_columns = RequiredStatColumns::new(); + let mut required_columns = RequiredColumns::new(); let predicate_expr = build_predicate_expression(&expr, schema.as_ref(), &mut required_columns); + + let literal_guarantees = LiteralGuarantee::analyze(&expr); + Ok(Self { schema, predicate_expr, required_columns, orig_expr: expr, + literal_guarantees, }) } @@ -198,40 +232,47 @@ impl PruningPredicate { /// /// [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier pub fn prune(&self, statistics: &S) -> Result> { + let mut builder = BoolVecBuilder::new(statistics.num_containers()); + + // Try to prove the predicate can't be true for the containers based on + // literal guarantees + for literal_guarantee in &self.literal_guarantees { + let LiteralGuarantee { + column, + guarantee, + literals, + } = literal_guarantee; + if let Some(results) = statistics.contained(column, literals) { + match guarantee { + // `In` means the values in the column must be one of the + // values in the set for the predicate to evaluate to true. + // If `contained` returns false, that means the column is + // not any of the values so we can prune the container + Guarantee::In => builder.append_array(&results), + // `NotIn` means the values in the column must must not be + // any of the values in the set for the predicate to + // evaluate to true. If contained returns true, it means the + // column is only in the set of values so we can prune the + // container + Guarantee::NotIn => { + builder.append_array(&arrow::compute::not(&results)?) + } + } + } + } + + // Next, try to prove the predicate can't be true for the containers based + // on min/max values + // build a RecordBatch that contains the min/max values in the - // appropriate statistics columns + // appropriate statistics columns for the min/max predicate let statistics_batch = build_statistics_record_batch(statistics, &self.required_columns)?; - // Evaluate the pruning predicate on that record batch. - // - // Use true when the result of evaluating a predicate - // expression on a row group is null (aka `None`). Null can - // arise when the statistics are unknown or some calculation - // in the predicate means we don't know for sure if the row - // group can be filtered out or not. To maintain correctness - // the row group must be kept and thus `true` is returned. - match self.predicate_expr.evaluate(&statistics_batch)? { - ColumnarValue::Array(array) => { - let predicate_array = downcast_value!(array, BooleanArray); + // Evaluate the pruning predicate on that record batch and append any results to the builder + builder.append_value(self.predicate_expr.evaluate(&statistics_batch)?); - Ok(predicate_array - .into_iter() - .map(|x| x.unwrap_or(true)) // None -> true per comments above - .collect::>()) - } - // result was a column - ColumnarValue::Scalar(ScalarValue::Boolean(v)) => { - let v = v.unwrap_or(true); // None -> true per comments above - Ok(vec![v; statistics.num_containers()]) - } - other => { - internal_err!( - "Unexpected result of pruning predicate evaluation. Expected Boolean array \ - or scalar but got {other:?}" - ) - } - } + Ok(builder.build()) } /// Return a reference to the input schema @@ -254,9 +295,91 @@ impl PruningPredicate { is_always_true(&self.predicate_expr) } - pub(crate) fn required_columns(&self) -> &RequiredStatColumns { + pub(crate) fn required_columns(&self) -> &RequiredColumns { &self.required_columns } + + /// Names of the columns that are known to be / not be in a set + /// of literals (constants). These are the columns the that may be passed to + /// [`PruningStatistics::contained`] during pruning. + /// + /// This is useful to avoid fetching statistics for columns that will not be + /// used in the predicate. For example, it can be used to avoid reading + /// uneeded bloom filters (a non trivial operation). + pub fn literal_columns(&self) -> Vec { + let mut seen = HashSet::new(); + self.literal_guarantees + .iter() + .map(|e| &e.column.name) + // avoid duplicates + .filter(|name| seen.insert(*name)) + .map(|s| s.to_string()) + .collect() + } +} + +/// Builds the return `Vec` for [`PruningPredicate::prune`]. +#[derive(Debug)] +struct BoolVecBuilder { + /// One element per container. Each element is + /// * `true`: if the container has row that may pass the predicate + /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate + inner: Vec, +} + +impl BoolVecBuilder { + /// Create a new `BoolVecBuilder` with `num_containers` elements + fn new(num_containers: usize) -> Self { + Self { + // assume by default all containers may pass the predicate + inner: vec![true; num_containers], + } + } + + /// Combines result `array` for a conjunct (e.g. `AND` clause) of a + /// predicate into the currently in progress array. + /// + /// Each `array` element is: + /// * `true`: container has row that may pass the predicate + /// * `false`: all container rows DEFINITELY DO NOT pass the predicate + /// * `null`: container may or may not have rows that pass the predicate + fn append_array(&mut self, array: &BooleanArray) { + assert_eq!(array.len(), self.inner.len()); + for (cur, new) in self.inner.iter_mut().zip(array.iter()) { + // `false` for this conjunct means we know for sure no rows could + // pass the predicate and thus we set the corresponding container + // location to false. + if let Some(false) = new { + *cur = false; + } + } + } + + /// Combines the results in the [`ColumnarValue`] to the currently in + /// progress array, following the same rules as [`Self::append_array`]. + /// + /// # Panics + /// If `value` is not boolean + fn append_value(&mut self, value: ColumnarValue) { + match value { + ColumnarValue::Array(array) => { + self.append_array(array.as_boolean()); + } + ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => { + // False means all containers can not pass the predicate + self.inner = vec![false; self.inner.len()]; + } + _ => { + // Null or true means the rows in container may pass this + // conjunct so we can't prune any containers based on that + } + } + } + + /// Convert this builder into a Vec of bools + fn build(self) -> Vec { + self.inner + } } fn is_always_true(expr: &Arc) -> bool { @@ -276,21 +399,21 @@ fn is_always_true(expr: &Arc) -> bool { /// Handles creating references to the min/max statistics /// for columns as well as recording which statistics are needed #[derive(Debug, Default, Clone)] -pub(crate) struct RequiredStatColumns { +pub(crate) struct RequiredColumns { /// The statistics required to evaluate this predicate: /// * The unqualified column in the input schema /// * Statistics type (e.g. Min or Max or Null_Count) /// * The field the statistics value should be placed in for - /// pruning predicate evaluation + /// pruning predicate evaluation (e.g. `min_value` or `max_value`) columns: Vec<(phys_expr::Column, StatisticsType, Field)>, } -impl RequiredStatColumns { +impl RequiredColumns { fn new() -> Self { Self::default() } - /// Returns number of unique columns. + /// Returns number of unique columns pub(crate) fn n_columns(&self) -> usize { self.iter() .map(|(c, _s, _f)| c) @@ -344,11 +467,10 @@ impl RequiredStatColumns { // only add statistics column if not previously added if need_to_insert { - let stat_field = Field::new( - stat_column.name(), - field.data_type().clone(), - field.is_nullable(), - ); + // may be null if statistics are not present + let nullable = true; + let stat_field = + Field::new(stat_column.name(), field.data_type().clone(), nullable); self.columns.push((column.clone(), stat_type, stat_field)); } rewrite_column_expr(column_expr.clone(), column, &stat_column) @@ -391,7 +513,7 @@ impl RequiredStatColumns { } } -impl From> for RequiredStatColumns { +impl From> for RequiredColumns { fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self { Self { columns } } @@ -424,7 +546,7 @@ impl From> for RequiredStatColum /// ``` fn build_statistics_record_batch( statistics: &S, - required_columns: &RequiredStatColumns, + required_columns: &RequiredColumns, ) -> Result { let mut fields = Vec::::new(); let mut arrays = Vec::::new(); @@ -480,7 +602,7 @@ struct PruningExpressionBuilder<'a> { op: Operator, scalar_expr: Arc, field: &'a Field, - required_columns: &'a mut RequiredStatColumns, + required_columns: &'a mut RequiredColumns, } impl<'a> PruningExpressionBuilder<'a> { @@ -489,7 +611,7 @@ impl<'a> PruningExpressionBuilder<'a> { right: &'a Arc, op: Operator, schema: &'a Schema, - required_columns: &'a mut RequiredStatColumns, + required_columns: &'a mut RequiredColumns, ) -> Result { // find column name; input could be a more complicated expression let left_columns = collect_columns(left); @@ -704,7 +826,7 @@ fn reverse_operator(op: Operator) -> Result { fn build_single_column_expr( column: &phys_expr::Column, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, is_not: bool, // if true, treat as !col ) -> Option> { let field = schema.field_with_name(column.name()).ok()?; @@ -745,7 +867,7 @@ fn build_single_column_expr( fn build_is_null_column_expr( expr: &Arc, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, ) -> Option> { if let Some(col) = expr.as_any().downcast_ref::() { let field = schema.field_with_name(col.name()).ok()?; @@ -775,7 +897,7 @@ fn build_is_null_column_expr( fn build_predicate_expression( expr: &Arc, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, ) -> Arc { // Returned for unsupported expressions. Such expressions are // converted to TRUE. @@ -984,7 +1106,7 @@ mod tests { use std::collections::HashMap; use std::ops::{Not, Rem}; - #[derive(Debug)] + #[derive(Debug, Default)] /// Mock statistic provider for tests /// /// Each row represents the statistics for a "container" (which @@ -993,95 +1115,139 @@ mod tests { /// /// Note All `ArrayRefs` must be the same size. struct ContainerStats { - min: ArrayRef, - max: ArrayRef, + min: Option, + max: Option, /// Optional values null_counts: Option, + /// Optional known values (e.g. mimic a bloom filter) + /// (value, contained) + /// If present, all BooleanArrays must be the same size as min/max + contained: Vec<(HashSet, BooleanArray)>, } impl ContainerStats { + fn new() -> Self { + Default::default() + } fn new_decimal128( min: impl IntoIterator>, max: impl IntoIterator>, precision: u8, scale: i8, ) -> Self { - Self { - min: Arc::new( + Self::new() + .with_min(Arc::new( min.into_iter() .collect::() .with_precision_and_scale(precision, scale) .unwrap(), - ), - max: Arc::new( + )) + .with_max(Arc::new( max.into_iter() .collect::() .with_precision_and_scale(precision, scale) .unwrap(), - ), - null_counts: None, - } + )) } fn new_i64( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn new_i32( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn new_utf8<'a>( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn new_bool( min: impl IntoIterator>, max: impl IntoIterator>, ) -> Self { - Self { - min: Arc::new(min.into_iter().collect::()), - max: Arc::new(max.into_iter().collect::()), - null_counts: None, - } + Self::new() + .with_min(Arc::new(min.into_iter().collect::())) + .with_max(Arc::new(max.into_iter().collect::())) } fn min(&self) -> Option { - Some(self.min.clone()) + self.min.clone() } fn max(&self) -> Option { - Some(self.max.clone()) + self.max.clone() } fn null_counts(&self) -> Option { self.null_counts.clone() } + /// return an iterator over all arrays in this statistics + fn arrays(&self) -> Vec { + let contained_arrays = self + .contained + .iter() + .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef); + + [ + self.min.as_ref().cloned(), + self.max.as_ref().cloned(), + self.null_counts.as_ref().cloned(), + ] + .into_iter() + .flatten() + .chain(contained_arrays) + .collect() + } + fn len(&self) -> usize { - assert_eq!(self.min.len(), self.max.len()); - self.min.len() + // pick the first non zero length + self.arrays().iter().map(|a| a.len()).next().unwrap_or(0) + } + + /// Ensure that the lengths of all arrays are consistent + fn assert_invariants(&self) { + let mut prev_len = None; + + for len in self.arrays().iter().map(|a| a.len()) { + // Get a length, if we don't already have one + match prev_len { + None => { + prev_len = Some(len); + } + Some(prev_len) => { + assert_eq!(prev_len, len); + } + } + } + } + + /// Add min values + fn with_min(mut self, min: ArrayRef) -> Self { + self.min = Some(min); + self + } + + /// Add max values + fn with_max(mut self, max: ArrayRef) -> Self { + self.max = Some(max); + self } /// Add null counts. There must be the same number of null counts as @@ -1090,14 +1256,36 @@ mod tests { mut self, counts: impl IntoIterator>, ) -> Self { - // take stats out and update them let null_counts: ArrayRef = Arc::new(counts.into_iter().collect::()); - assert_eq!(null_counts.len(), self.len()); + self.assert_invariants(); self.null_counts = Some(null_counts); self } + + /// Add contained informaation. + pub fn with_contained( + mut self, + values: impl IntoIterator, + contained: impl IntoIterator>, + ) -> Self { + let contained: BooleanArray = contained.into_iter().collect(); + let values: HashSet<_> = values.into_iter().collect(); + + self.contained.push((values, contained)); + self.assert_invariants(); + self + } + + /// get any contained information for the specified values + fn contained(&self, find_values: &HashSet) -> Option { + // find the one with the matching values + self.contained + .iter() + .find(|(values, _contained)| values == find_values) + .map(|(_values, contained)| contained.clone()) + } } #[derive(Debug, Default)] @@ -1135,13 +1323,34 @@ mod tests { let container_stats = self .stats .remove(&col) - .expect("Can not find stats for column") + .unwrap_or_default() .with_null_counts(counts); // put stats back in self.stats.insert(col, container_stats); self } + + /// Add contained informaation for the specified columm. + fn with_contained( + mut self, + name: impl Into, + values: impl IntoIterator, + contained: impl IntoIterator>, + ) -> Self { + let col = Column::from_name(name.into()); + + // take stats out and update them + let container_stats = self + .stats + .remove(&col) + .unwrap_or_default() + .with_contained(values, contained); + + // put stats back in + self.stats.insert(col, container_stats); + self + } } impl PruningStatistics for TestStatistics { @@ -1173,6 +1382,16 @@ mod tests { .map(|container_stats| container_stats.null_counts()) .unwrap_or(None) } + + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option { + self.stats + .get(column) + .and_then(|container_stats| container_stats.contained(values)) + } } /// Returns the specified min/max container values @@ -1198,12 +1417,20 @@ mod tests { fn null_counts(&self, _column: &Column) -> Option { None } + + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } } #[test] fn test_build_statistics_record_batch() { // Request a record batch with of s1_min, s2_max, s3_max, s3_min - let required_columns = RequiredStatColumns::from(vec![ + let required_columns = RequiredColumns::from(vec![ // min of original column s1, named s1_min ( phys_expr::Column::new("s1", 1), @@ -1275,7 +1502,7 @@ mod tests { // which is what Parquet does // Request a record batch with of s1_min as a timestamp - let required_columns = RequiredStatColumns::from(vec![( + let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, Field::new( @@ -1307,7 +1534,7 @@ mod tests { #[test] fn test_build_statistics_no_required_stats() { - let required_columns = RequiredStatColumns::new(); + let required_columns = RequiredColumns::new(); let statistics = OneContainerStats { min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))), @@ -1325,7 +1552,7 @@ mod tests { // Test requesting a Utf8 column when the stats return some other type // Request a record batch with of s1_min as a timestamp - let required_columns = RequiredStatColumns::from(vec![( + let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, Field::new("s1_min", DataType::Utf8, true), @@ -1354,7 +1581,7 @@ mod tests { #[test] fn test_build_statistics_inconsistent_length() { // return an inconsistent length to the actual statistics arrays - let required_columns = RequiredStatColumns::from(vec![( + let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s1", 3), StatisticsType::Min, Field::new("s1_min", DataType::Int64, true), @@ -1385,20 +1612,14 @@ mod tests { // test column on the left let expr = col("c1").eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1411,20 +1632,14 @@ mod tests { // test column on the left let expr = col("c1").not_eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).not_eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1437,20 +1652,14 @@ mod tests { // test column on the left let expr = col("c1").gt(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).lt(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1463,19 +1672,13 @@ mod tests { // test column on the left let expr = col("c1").gt_eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).lt_eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1488,20 +1691,14 @@ mod tests { // test column on the left let expr = col("c1").lt(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).gt(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1514,19 +1711,13 @@ mod tests { // test column on the left let expr = col("c1").lt_eq(lit(1)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(1).gt_eq(col("c1")); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1542,11 +1733,8 @@ mod tests { // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); let expected_expr = "c1_min@0 < 1"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1561,11 +1749,8 @@ mod tests { // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0))); let expected_expr = "true"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1577,11 +1762,8 @@ mod tests { let expected_expr = "true"; let expr = col("c1").not(); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1593,11 +1775,8 @@ mod tests { let expected_expr = "NOT c1_min@0 AND c1_max@1"; let expr = col("c1").not(); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1609,11 +1788,8 @@ mod tests { let expected_expr = "c1_min@0 OR c1_max@1"; let expr = col("c1"); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1627,11 +1803,8 @@ mod tests { // DF doesn't support arithmetic on boolean columns so // this predicate will error when evaluated let expr = col("c1").lt(lit(true)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1643,7 +1816,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), ]); - let mut required_columns = RequiredStatColumns::new(); + let mut required_columns = RequiredColumns::new(); // c1 < 1 and (c2 = 2 or c2 = 3) let expr = col("c1") .lt(lit(1)) @@ -1659,7 +1832,7 @@ mod tests { ( phys_expr::Column::new("c1", 0), StatisticsType::Min, - c1_min_field + c1_min_field.with_nullable(true) // could be nullable if stats are not present ) ); // c2 = 2 should add c2_min and c2_max @@ -1669,7 +1842,7 @@ mod tests { ( phys_expr::Column::new("c2", 1), StatisticsType::Min, - c2_min_field + c2_min_field.with_nullable(true) // could be nullable if stats are not present ) ); let c2_max_field = Field::new("c2_max", DataType::Int32, false); @@ -1678,7 +1851,7 @@ mod tests { ( phys_expr::Column::new("c2", 1), StatisticsType::Max, - c2_max_field + c2_max_field.with_nullable(true) // could be nullable if stats are not present ) ); // c2 = 3 shouldn't add any new statistics fields @@ -1700,11 +1873,8 @@ mod tests { false, )); let expected_expr = "c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_min@0 <= 3 AND 3 <= c1_max@1"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1719,11 +1889,8 @@ mod tests { // test c1 in() let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false)); let expected_expr = "true"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1744,11 +1911,8 @@ mod tests { let expected_expr = "(c1_min@0 != 1 OR 1 != c1_max@1) \ AND (c1_min@0 != 2 OR 2 != c1_max@1) \ AND (c1_min@0 != 3 OR 3 != c1_max@1)"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1762,20 +1926,14 @@ mod tests { // test column on the left let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1)))); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); let expected_expr = "TRY_CAST(c1_max@0 AS Int64) > 1"; @@ -1783,21 +1941,15 @@ mod tests { // test column on the left let expr = try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1)))); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); // test column on the right let expr = lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64)); - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -1817,11 +1969,8 @@ mod tests { false, )); let expected_expr = "CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); let expr = Expr::InList(InList::new( @@ -1837,11 +1986,8 @@ mod tests { "(CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) \ AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) \ AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))"; - let predicate_expr = test_build_predicate_expression( - &expr, - &schema, - &mut RequiredStatColumns::new(), - ); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); assert_eq!(predicate_expr.to_string(), expected_expr); Ok(()) @@ -2484,10 +2630,466 @@ mod tests { // TODO: add other negative test for other case and op } + #[test] + fn prune_with_contained_one_column() { + let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)])); + + // Model having information like a bloom filter for s1 + let statistics = TestStatistics::new() + .with_contained( + "s1", + [ScalarValue::from("foo")], + [ + // container 0 known to only contain "foo"", + Some(true), + // container 1 known to not contain "foo" + Some(false), + // container 2 unknown about "foo" + None, + // container 3 known to only contain "foo" + Some(true), + // container 4 known to not contain "foo" + Some(false), + // container 5 unknown about "foo" + None, + // container 6 known to only contain "foo" + Some(true), + // container 7 known to not contain "foo" + Some(false), + // container 8 unknown about "foo" + None, + ], + ) + .with_contained( + "s1", + [ScalarValue::from("bar")], + [ + // containers 0,1,2 known to only contain "bar" + Some(true), + Some(true), + Some(true), + // container 3,4,5 known to not contain "bar" + Some(false), + Some(false), + Some(false), + // container 6,7,8 unknown about "bar" + None, + None, + None, + ], + ) + .with_contained( + // the way the tests are setup, this data is + // consulted if the "foo" and "bar" are being checked at the same time + "s1", + [ScalarValue::from("foo"), ScalarValue::from("bar")], + [ + // container 0,1,2 unknown about ("foo, "bar") + None, + None, + None, + // container 3,4,5 known to contain only either "foo" and "bar" + Some(true), + Some(true), + Some(true), + // container 6,7,8 known ro contain neither "foo" and "bar" + Some(false), + Some(false), + Some(false), + ], + ); + + // s1 = 'foo' + prune_with_expr( + col("s1").eq(lit("foo")), + &schema, + &statistics, + // rule out containers ('false) where we know foo is not present + vec![true, false, true, true, false, true, true, false, true], + ); + + // s1 = 'bar' + prune_with_expr( + col("s1").eq(lit("bar")), + &schema, + &statistics, + // rule out containers where we know bar is not present + vec![true, true, true, false, false, false, true, true, true], + ); + + // s1 = 'baz' (unknown value) + prune_with_expr( + col("s1").eq(lit("baz")), + &schema, + &statistics, + // can't rule out anything + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 = 'foo' AND s1 = 'bar' + prune_with_expr( + col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))), + &schema, + &statistics, + // logically this predicate can't possibly be true (the column can't + // take on both values) but we could rule it out if the stats tell + // us that both values are not present + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 = 'foo' OR s1 = 'bar' + prune_with_expr( + col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))), + &schema, + &statistics, + // can rule out containers that we know contain neither foo nor bar + vec![true, true, true, true, true, true, false, false, false], + ); + + // s1 = 'foo' OR s1 = 'baz' + prune_with_expr( + col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))), + &schema, + &statistics, + // can't rule out anything container + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz' + prune_with_expr( + col("s1") + .eq(lit("foo")) + .or(col("s1").eq(lit("bar"))) + .or(col("s1").eq(lit("baz"))), + &schema, + &statistics, + // can rule out any containers based on knowledge of s1 and `foo`, + // `bar` and (`foo`, `bar`) + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 != foo + prune_with_expr( + col("s1").not_eq(lit("foo")), + &schema, + &statistics, + // rule out containers we know for sure only contain foo + vec![false, true, true, false, true, true, false, true, true], + ); + + // s1 != bar + prune_with_expr( + col("s1").not_eq(lit("bar")), + &schema, + &statistics, + // rule out when we know for sure s1 has the value bar + vec![false, false, false, true, true, true, true, true, true], + ); + + // s1 != foo AND s1 != bar + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .and(col("s1").not_eq(lit("bar"))), + &schema, + &statistics, + // can rule out any container where we know s1 does not have either 'foo' or 'bar' + vec![true, true, true, false, false, false, true, true, true], + ); + + // s1 != foo AND s1 != bar AND s1 != baz + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .and(col("s1").not_eq(lit("bar"))) + .and(col("s1").not_eq(lit("baz"))), + &schema, + &statistics, + // can't rule out any container based on knowledge of s1,s2 + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 != foo OR s1 != bar + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .or(col("s1").not_eq(lit("bar"))), + &schema, + &statistics, + // cant' rule out anything based on contains information + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 != foo OR s1 != bar OR s1 != baz + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .or(col("s1").not_eq(lit("bar"))) + .or(col("s1").not_eq(lit("baz"))), + &schema, + &statistics, + // cant' rule out anything based on contains information + vec![true, true, true, true, true, true, true, true, true], + ); + } + + #[test] + fn prune_with_contained_two_columns() { + let schema = Arc::new(Schema::new(vec![ + Field::new("s1", DataType::Utf8, true), + Field::new("s2", DataType::Utf8, true), + ])); + + // Model having information like bloom filters for s1 and s2 + let statistics = TestStatistics::new() + .with_contained( + "s1", + [ScalarValue::from("foo")], + [ + // container 0, s1 known to only contain "foo"", + Some(true), + // container 1, s1 known to not contain "foo" + Some(false), + // container 2, s1 unknown about "foo" + None, + // container 3, s1 known to only contain "foo" + Some(true), + // container 4, s1 known to not contain "foo" + Some(false), + // container 5, s1 unknown about "foo" + None, + // container 6, s1 known to only contain "foo" + Some(true), + // container 7, s1 known to not contain "foo" + Some(false), + // container 8, s1 unknown about "foo" + None, + ], + ) + .with_contained( + "s2", // for column s2 + [ScalarValue::from("bar")], + [ + // containers 0,1,2 s2 known to only contain "bar" + Some(true), + Some(true), + Some(true), + // container 3,4,5 s2 known to not contain "bar" + Some(false), + Some(false), + Some(false), + // container 6,7,8 s2 unknown about "bar" + None, + None, + None, + ], + ); + + // s1 = 'foo' + prune_with_expr( + col("s1").eq(lit("foo")), + &schema, + &statistics, + // rule out containers where we know s1 is not present + vec![true, false, true, true, false, true, true, false, true], + ); + + // s1 = 'foo' OR s2 = 'bar' + let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar"))); + prune_with_expr( + expr, + &schema, + &statistics, + // can't rule out any container (would need to prove that s1 != foo AND s2 != bar) + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 = 'foo' AND s2 != 'bar' + prune_with_expr( + col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))), + &schema, + &statistics, + // can only rule out container where we know either: + // 1. s1 doesn't have the value 'foo` or + // 2. s2 has only the value of 'bar' + vec![false, false, false, true, false, true, true, false, true], + ); + + // s1 != 'foo' AND s2 != 'bar' + prune_with_expr( + col("s1") + .not_eq(lit("foo")) + .and(col("s2").not_eq(lit("bar"))), + &schema, + &statistics, + // Can rule out any container where we know either + // 1. s1 has only the value 'foo' + // 2. s2 has only the value 'bar' + vec![false, false, false, false, true, true, false, true, true], + ); + + // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz') + prune_with_expr( + col("s1").not_eq(lit("foo")).and( + col("s2") + .not_eq(lit("bar")) + .or(col("s2").not_eq(lit("baz"))), + ), + &schema, + &statistics, + // Can rule out any container where we know s1 has only the value + // 'foo'. Can't use knowledge of s2 and bar to rule out anything + vec![false, true, true, false, true, true, false, true, true], + ); + + // s1 like '%foo%bar%' + prune_with_expr( + col("s1").like(lit("foo%bar%")), + &schema, + &statistics, + // cant rule out anything with information we know + vec![true, true, true, true, true, true, true, true, true], + ); + + // s1 like '%foo%bar%' AND s2 = 'bar' + prune_with_expr( + col("s1") + .like(lit("foo%bar%")) + .and(col("s2").eq(lit("bar"))), + &schema, + &statistics, + // can rule out any container where we know s2 does not have the value 'bar' + vec![true, true, true, false, false, false, true, true, true], + ); + + // s1 like '%foo%bar%' OR s2 = 'bar' + prune_with_expr( + col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))), + &schema, + &statistics, + // can't rule out anything (we would have to prove that both the + // like and the equality must be false) + vec![true, true, true, true, true, true, true, true, true], + ); + } + + #[test] + fn prune_with_range_and_contained() { + // Setup mimics range information for i, a bloom filter for s + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + Field::new("s", DataType::Utf8, true), + ])); + + let statistics = TestStatistics::new() + .with( + "i", + ContainerStats::new_i32( + // Container 0, 3, 6: [-5 to 5] + // Container 1, 4, 7: [10 to 20] + // Container 2, 5, 9: unknown + vec![ + Some(-5), + Some(10), + None, + Some(-5), + Some(10), + None, + Some(-5), + Some(10), + None, + ], // min + vec![ + Some(5), + Some(20), + None, + Some(5), + Some(20), + None, + Some(5), + Some(20), + None, + ], // max + ), + ) + // Add contained information about the s and "foo" + .with_contained( + "s", + [ScalarValue::from("foo")], + [ + // container 0,1,2 known to only contain "foo" + Some(true), + Some(true), + Some(true), + // container 3,4,5 known to not contain "foo" + Some(false), + Some(false), + Some(false), + // container 6,7,8 unknown about "foo" + None, + None, + None, + ], + ); + + // i = 0 and s = 'foo' + prune_with_expr( + col("i").eq(lit(0)).and(col("s").eq(lit("foo"))), + &schema, + &statistics, + // Can rule out container where we know that either: + // 1. 0 is outside the min/max range of i + // 1. s does not contain foo + // (range is false, and contained is false) + vec![true, false, true, false, false, false, true, false, true], + ); + + // i = 0 and s != 'foo' + prune_with_expr( + col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))), + &schema, + &statistics, + // Can rule out containers where either: + // 1. 0 is outside the min/max range of i + // 2. s only contains foo + vec![false, false, false, true, false, true, true, false, true], + ); + + // i = 0 OR s = 'foo' + prune_with_expr( + col("i").eq(lit(0)).or(col("s").not_eq(lit("foo"))), + &schema, + &statistics, + // in theory could rule out containers if we had min/max values for + // s as well. But in this case we don't so we can't rule out anything + vec![true, true, true, true, true, true, true, true, true], + ); + } + + /// prunes the specified expr with the specified schema and statistics, and + /// ensures it returns expected. + /// + /// `expected` is a vector of bools, where true means the row group should + /// be kept, and false means it should be pruned. + /// + // TODO refactor other tests to use this to reduce boiler plate + fn prune_with_expr( + expr: Expr, + schema: &SchemaRef, + statistics: &TestStatistics, + expected: Vec, + ) { + println!("Pruning with expr: {}", expr); + let expr = logical2physical(&expr, schema); + let p = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let result = p.prune(statistics).unwrap(); + assert_eq!(result, expected); + } + fn test_build_predicate_expression( expr: &Expr, schema: &Schema, - required_columns: &mut RequiredStatColumns, + required_columns: &mut RequiredColumns, ) -> Arc { let expr = logical2physical(expr, schema); build_predicate_expression(&expr, schema, required_columns) From 324cb109d47a3deab041bd03be47251abfbacc94 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Dec 2023 13:37:08 -0500 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Nga Tran --- datafusion/core/src/physical_optimizer/pruning.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 5fabf6a4b501..7d2c43f1d9ee 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1264,7 +1264,7 @@ mod tests { self } - /// Add contained informaation. + /// Add contained information. pub fn with_contained( mut self, values: impl IntoIterator, @@ -1331,7 +1331,7 @@ mod tests { self } - /// Add contained informaation for the specified columm. + /// Add contained information for the specified columm. fn with_contained( mut self, name: impl Into, @@ -2692,7 +2692,7 @@ mod tests { Some(true), Some(true), Some(true), - // container 6,7,8 known ro contain neither "foo" and "bar" + // container 6,7,8 known to contain neither "foo" and "bar" Some(false), Some(false), Some(false), @@ -2932,8 +2932,8 @@ mod tests { prune_with_expr( col("s1").not_eq(lit("foo")).and( col("s2") - .not_eq(lit("bar")) - .or(col("s2").not_eq(lit("baz"))), + .eq(lit("bar")) + .or(col("s2").eq(lit("baz"))), ), &schema, &statistics, @@ -3057,7 +3057,7 @@ mod tests { // i = 0 OR s = 'foo' prune_with_expr( - col("i").eq(lit(0)).or(col("s").not_eq(lit("foo"))), + col("i").eq(lit(0)).or(col("s").eq(lit("foo"))), &schema, &statistics, // in theory could rule out containers if we had min/max values for From 3775f0fc26b444f42f34971d54e4f69d13bf82e2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Dec 2023 13:41:39 -0500 Subject: [PATCH 3/4] Add comment to len(), fix fmt --- datafusion/core/src/physical_optimizer/pruning.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 7d2c43f1d9ee..bf168f825365 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1216,6 +1216,9 @@ mod tests { .collect() } + /// Returns the number of containers represented by this statistics This + /// picks the length of the first array as all arrays must have the same + /// length (which is verified by `assert_invariants`). fn len(&self) -> usize { // pick the first non zero length self.arrays().iter().map(|a| a.len()).next().unwrap_or(0) @@ -2930,11 +2933,9 @@ mod tests { // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz') prune_with_expr( - col("s1").not_eq(lit("foo")).and( - col("s2") - .eq(lit("bar")) - .or(col("s2").eq(lit("baz"))), - ), + col("s1") + .not_eq(lit("foo")) + .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))), &schema, &statistics, // Can rule out any container where we know s1 has only the value From 71c41f2571f1d64bc8192aa5ab1cdb76ba3e2d4c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 22 Dec 2023 14:43:13 -0500 Subject: [PATCH 4/4] rename BoolVecBuilder::append* to BoolVecBuilder::combine* --- .../core/src/physical_optimizer/pruning.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index bf168f825365..79e084d7b7f1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -248,14 +248,14 @@ impl PruningPredicate { // values in the set for the predicate to evaluate to true. // If `contained` returns false, that means the column is // not any of the values so we can prune the container - Guarantee::In => builder.append_array(&results), + Guarantee::In => builder.combine_array(&results), // `NotIn` means the values in the column must must not be // any of the values in the set for the predicate to // evaluate to true. If contained returns true, it means the // column is only in the set of values so we can prune the // container Guarantee::NotIn => { - builder.append_array(&arrow::compute::not(&results)?) + builder.combine_array(&arrow::compute::not(&results)?) } } } @@ -270,7 +270,7 @@ impl PruningPredicate { build_statistics_record_batch(statistics, &self.required_columns)?; // Evaluate the pruning predicate on that record batch and append any results to the builder - builder.append_value(self.predicate_expr.evaluate(&statistics_batch)?); + builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?); Ok(builder.build()) } @@ -336,14 +336,14 @@ impl BoolVecBuilder { } } - /// Combines result `array` for a conjunct (e.g. `AND` clause) of a + /// Combines result `array` for a conjunct (e.g. `AND` clause) of a /// predicate into the currently in progress array. /// /// Each `array` element is: /// * `true`: container has row that may pass the predicate /// * `false`: all container rows DEFINITELY DO NOT pass the predicate /// * `null`: container may or may not have rows that pass the predicate - fn append_array(&mut self, array: &BooleanArray) { + fn combine_array(&mut self, array: &BooleanArray) { assert_eq!(array.len(), self.inner.len()); for (cur, new) in self.inner.iter_mut().zip(array.iter()) { // `false` for this conjunct means we know for sure no rows could @@ -356,14 +356,14 @@ impl BoolVecBuilder { } /// Combines the results in the [`ColumnarValue`] to the currently in - /// progress array, following the same rules as [`Self::append_array`]. + /// progress array, following the same rules as [`Self::combine_array`]. /// /// # Panics /// If `value` is not boolean - fn append_value(&mut self, value: ColumnarValue) { + fn combine_value(&mut self, value: ColumnarValue) { match value { ColumnarValue::Array(array) => { - self.append_array(array.as_boolean()); + self.combine_array(array.as_boolean()); } ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => { // False means all containers can not pass the predicate