diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 5d18eac7d9fb..af63ea7124e0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use arrow::compute::{cast_with_options, CastOptions}; use arrow::{array::ArrayRef, datatypes::Schema}; -use arrow_array::BooleanArray; -use arrow_schema::FieldRef; -use datafusion_common::{Column, ScalarValue}; +use arrow_array::{Array, BooleanArray}; +use arrow_schema::{DataType, FieldRef}; +use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; +use datafusion_common::{Column, DataFusionError, ScalarValue}; use parquet::file::metadata::ColumnChunkMetaData; use parquet::schema::types::SchemaDescriptor; use parquet::{ @@ -276,15 +278,75 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { scalar.to_array().ok() } + /// The basic idea is to check whether all of the `values` are not within the min-max boundary. + /// If any one value is within the min-max boundary, then this row group will not be skipped. + /// Otherwise, this row group will be able to be skipped. fn contained( &self, - _column: &Column, - _values: &HashSet, + column: &Column, + values: &HashSet, ) -> Option { - None + let min_values = self.min_values(column)?; + let max_values = self.max_values(column)?; + // The boundary should be with length of 1 + if min_values.len() != max_values.len() || min_values.len() != 1 { + return None; + } + let min_value = ScalarValue::try_from_array(min_values.as_ref(), 0).ok()?; + let max_value = ScalarValue::try_from_array(max_values.as_ref(), 0).ok()?; + + // The boundary should be with the same data type + if min_value.data_type() != max_value.data_type() { + return None; + } + let target_data_type = min_value.data_type(); + + let (c, _) = self.column(&column.name)?; + let has_null = c.statistics()?.null_count() > 0; + let mut known_not_present = true; + for value in values { + // If it's null, check whether the null exists from the statistics + if has_null && value.is_null() { + known_not_present = false; + break; + } + // The filter values should be cast to the boundary's data type + let value = if let Ok(value) = + cast_scalar_value(value, &target_data_type, &DEFAULT_CAST_OPTIONS) + { + value + } else { + return None; + }; + + // If the filter value is within the boundary, will not be able to filter out this row group + if value >= min_value && value <= max_value { + known_not_present = false; + break; + } + } + + let contains = if known_not_present { Some(false) } else { None }; + + Some(BooleanArray::from(vec![contains])) } } +const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions { + safe: false, + format_options: DEFAULT_FORMAT_OPTIONS, +}; + +/// Cast scalar value to the given data type using an arrow kernel. +fn cast_scalar_value( + value: &ScalarValue, + data_type: &DataType, + cast_options: &CastOptions, +) -> Result { + let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?; + ScalarValue::try_from_array(&cast_array, 0) +} + #[cfg(test)] mod tests { use super::*; @@ -574,12 +636,14 @@ mod tests { #[test] fn row_group_pruning_predicate_null_expr() { use datafusion_expr::{col, lit}; - // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 + let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); + + // c1 > 15 and c2 IS NULL => c1_max > 15 and bool_null_count > 0 let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); let expr = logical2physical(&expr, &schema); let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); @@ -598,19 +662,39 @@ mod tests { ), vec![1] ); + + // c1 < 5 and c2 IS NULL => c1_min < 5 and bool_null_count > 0 + let expr = col("c1").lt(lit(5)).and(col("c2").is_null()); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + // First row group was filtered out because it contains no null value on "c2". + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &groups, + None, + Some(&pruning_predicate), + &metrics + ), + Vec::::new() + ); } #[test] fn row_group_pruning_predicate_eq_null_expr() { use datafusion_expr::{col, lit}; // test row group predicate with an unknown (Null) expr - // - // int > 1 and bool = NULL => c1_max > 1 and null let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); + + // c1 > 15 and c2 IS NULL => c1_max > 15 and bool_null_count > 0 let expr = col("c1") .gt(lit(15)) .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); @@ -632,6 +716,29 @@ mod tests { ), vec![1] ); + + // c1 < 5 and c2 IS NULL => c1_min < 5 and bool_null_count > 0 + let expr = col("c1") + .lt(lit(5)) + .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + // bool = NULL always evaluates to NULL (and thus will not + // pass predicates. Ideally these should both be false + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &groups, + None, + Some(&pruning_predicate), + &metrics, + ), + Vec::::new() + ); } #[test] @@ -829,11 +936,6 @@ mod tests { .with_precision(18) .with_byte_len(16); let schema_descr = get_test_schema_descr(vec![field]); - // cast the type of c1 to decimal(28,3) - let left = cast(col("c1"), DataType::Decimal128(28, 3)); - let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -854,9 +956,9 @@ mod tests { let rgm2 = get_row_group_meta_data( &schema_descr, vec![ParquetStatistics::fixed_len_byte_array( - // 5.00 + // 10.00 Some(FixedLenByteArray::from(ByteArray::from( - 500i128.to_be_bytes().to_vec(), + 1000i128.to_be_bytes().to_vec(), ))), // 200.00 Some(FixedLenByteArray::from(ByteArray::from( @@ -867,25 +969,80 @@ mod tests { false, )], ); - let rgm3 = get_row_group_meta_data( &schema_descr, vec![ParquetStatistics::fixed_len_byte_array( None, None, None, 0, false, )], ); + let rgms = [rgm1, rgm2, rgm3]; + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( &schema, &schema_descr, - &[rgm1, rgm2, rgm3], + &rgms, None, Some(&pruning_predicate), &metrics ), vec![1, 2] ); + // c1 in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + false, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 2] + ); + // c1 not in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + true, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 1, 2] + ); // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) // the type of parquet is decimal(18,2) @@ -903,11 +1060,6 @@ mod tests { .with_precision(18) .with_byte_len(16); let schema_descr = get_test_schema_descr(vec![field]); - // cast the type of c1 to decimal(28,3) - let left = cast(col("c1"), DataType::Decimal128(28, 3)); - let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -924,8 +1076,8 @@ mod tests { let rgm2 = get_row_group_meta_data( &schema_descr, vec![ParquetStatistics::byte_array( - // 5.00 - Some(ByteArray::from(500i128.to_be_bytes().to_vec())), + // 10.00 + Some(ByteArray::from(1000i128.to_be_bytes().to_vec())), // 200.00 Some(ByteArray::from(20000i128.to_be_bytes().to_vec())), None, @@ -937,18 +1089,74 @@ mod tests { &schema_descr, vec![ParquetStatistics::byte_array(None, None, None, 0, false)], ); + let rgms = [rgm1, rgm2, rgm3]; + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( &schema, &schema_descr, - &[rgm1, rgm2, rgm3], + &rgms, None, Some(&pruning_predicate), &metrics ), vec![1, 2] ); + // c1 in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + false, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 2] + ); + // c1 not in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + true, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 1, 2] + ); } fn get_row_group_meta_data(