From b3e17e7fe5f2749eb6f48bc6b91244302772df7a Mon Sep 17 00:00:00 2001 From: yi wang <48236141+my-vegetable-has-exploded@users.noreply.github.com> Date: Wed, 10 Jan 2024 04:18:35 +0800 Subject: [PATCH] feat: Add bloom filter metric to ParquetExec (#8772) * sbbf metric for parquet. * fix tests. * integration-test * fix clippy & tests * fix clippy. * add more comments * rename make_int32_range * update metric name. --- .../physical_plan/parquet/metrics.rs | 17 ++++-- .../datasource/physical_plan/parquet/mod.rs | 13 ++++- .../physical_plan/parquet/row_groups.rs | 13 ++--- .../core/src/physical_optimizer/pruning.rs | 5 ++ datafusion/core/tests/parquet/mod.rs | 36 ++++++++++-- .../core/tests/parquet/row_group_pruning.rs | 57 ++++++++++++++++++- datafusion/core/tests/sql/explain_analyze.rs | 6 +- .../physical-expr/src/utils/guarantee.rs | 34 +++++++++++ .../test_files/repartition_scan.slt | 8 +-- 9 files changed, 164 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs index 915fb56680f5..a17a3c6d9752 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs @@ -29,8 +29,10 @@ use crate::physical_plan::metrics::{ pub struct ParquetFileMetrics { /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: Count, - /// Number of row groups pruned using - pub row_groups_pruned: Count, + /// Number of row groups pruned by bloom filters + pub row_groups_pruned_bloom_filter: Count, + /// Number of row groups pruned by statistics + pub row_groups_pruned_statistics: Count, /// Total number of bytes scanned pub bytes_scanned: Count, /// Total rows filtered out by predicates pushed into parquet scan @@ -54,9 +56,13 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .counter("predicate_evaluation_errors", partition); - let row_groups_pruned = MetricBuilder::new(metrics) + let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .counter("row_groups_pruned", partition); + .counter("row_groups_pruned_bloom_filter", partition); + + let row_groups_pruned_statistics = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("row_groups_pruned_statistics", partition); let bytes_scanned = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) @@ -79,7 +85,8 @@ impl ParquetFileMetrics { Self { predicate_evaluation_errors, - row_groups_pruned, + row_groups_pruned_bloom_filter, + row_groups_pruned_statistics, bytes_scanned, pushdown_rows_filtered, pushdown_eval_time, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 9d81d8d083c2..84b312520161 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -51,6 +51,7 @@ use datafusion_physical_expr::{ use bytes::Bytes; use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use log::debug; use object_store::path::Path; use object_store::ObjectStore; @@ -278,7 +279,17 @@ impl DisplayAs for ParquetExec { let pruning_predicate_string = self .pruning_predicate .as_ref() - .map(|pre| format!(", pruning_predicate={}", pre.predicate_expr())) + .map(|pre| { + format!( + ", pruning_predicate={}, required_guarantees=[{}]", + pre.predicate_expr(), + pre.literal_guarantees() + .iter() + .map(|item| format!("{}", item)) + .collect_vec() + .join(", ") + ) + }) .unwrap_or_default(); write!(f, "ParquetExec: ")?; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 24c65423dd4c..bdc4f26f3c76 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -81,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics( Ok(values) => { // NB: false means don't scan row group if !values[0] { - metrics.row_groups_pruned.add(1); + metrics.row_groups_pruned_statistics.add(1); continue; } } @@ -159,7 +159,7 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< }; if prune_group { - metrics.row_groups_pruned.add(1); + metrics.row_groups_pruned_bloom_filter.add(1); } else { filtered.push(*idx); } @@ -1049,12 +1049,9 @@ mod tests { let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); let expr = col(r#""String""#).in_list( - vec![ - lit("Hello_Not_Exists"), - lit("Hello_Not_Exists2"), - lit("Hello_Not_Exists3"), - lit("Hello_Not_Exist4"), - ], + (1..25) + .map(|i| lit(format!("Hello_Not_Exists{}", i))) + .collect::>(), false, ); let expr = logical2physical(&expr, &schema); diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 0cbbaf2bf6cd..2e372547053b 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -314,6 +314,11 @@ impl PruningPredicate { &self.predicate_expr } + /// Returns a reference to the literal guarantees + pub fn literal_guarantees(&self) -> &[LiteralGuarantee] { + &self.literal_guarantees + } + /// Returns true if this pruning predicate can not prune anything. /// /// This happens if the predicate is a literal `true` and diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 943f7fdbf4ac..672498a9f84e 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -63,6 +63,7 @@ enum Scenario { Timestamps, Dates, Int32, + Int32Range, Float64, Decimal, DecimalLargePrecision, @@ -113,12 +114,24 @@ impl TestOutput { self.metric_value("predicate_evaluation_errors") } - /// The number of times the pruning predicate evaluation errors + /// The number of row_groups pruned by bloom filter + fn row_groups_pruned_bloom_filter(&self) -> Option { + self.metric_value("row_groups_pruned_bloom_filter") + } + + /// The number of row_groups pruned by statistics + fn row_groups_pruned_statistics(&self) -> Option { + self.metric_value("row_groups_pruned_statistics") + } + + /// The number of row_groups pruned fn row_groups_pruned(&self) -> Option { - self.metric_value("row_groups_pruned") + self.row_groups_pruned_bloom_filter() + .zip(self.row_groups_pruned_statistics()) + .map(|(a, b)| a + b) } - /// The number of times the pruning predicate evaluation errors + /// The number of row pages pruned fn row_pages_pruned(&self) -> Option { self.metric_value("page_index_rows_filtered") } @@ -145,7 +158,11 @@ impl ContextWithParquet { mut config: SessionConfig, ) -> Self { let file = match unit { - Unit::RowGroup => make_test_file_rg(scenario).await, + Unit::RowGroup => { + let config = config.options_mut(); + config.execution.parquet.bloom_filter_enabled = true; + make_test_file_rg(scenario).await + } Unit::Page => { let config = config.options_mut(); config.execution.parquet.enable_page_index = true; @@ -360,6 +377,13 @@ fn make_int32_batch(start: i32, end: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } +fn make_int32_range(start: i32, end: i32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); + let v = vec![start, end]; + let array = Arc::new(Int32Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + /// Return record batch with f64 vector /// /// Columns are named @@ -508,6 +532,9 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_int32_batch(5, 10), ] } + Scenario::Int32Range => { + vec![make_int32_range(0, 10), make_int32_range(200000, 300000)] + } Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), @@ -565,6 +592,7 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { let props = WriterProperties::builder() .set_max_row_group_size(5) + .set_bloom_filter_enabled(true) .build(); let batches = create_data_batch(scenario); diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 7b5470fe350a..2bc5bd3f1ca7 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -20,6 +20,7 @@ //! expected. use datafusion::prelude::SessionConfig; use datafusion_common::ScalarValue; +use itertools::Itertools; use crate::parquet::Unit::RowGroup; use crate::parquet::{ContextWithParquet, Scenario}; @@ -48,6 +49,38 @@ async fn test_prune( ); } +/// check row group pruning by bloom filter and statistics independently +async fn test_prune_verbose( + case_data_type: Scenario, + sql: &str, + expected_errors: Option, + expected_row_group_pruned_sbbf: Option, + expected_row_group_pruned_statistics: Option, + expected_results: usize, +) { + let output = ContextWithParquet::new(case_data_type, RowGroup) + .await + .query(sql) + .await; + + println!("{}", output.description()); + assert_eq!(output.predicate_evaluation_errors(), expected_errors); + assert_eq!( + output.row_groups_pruned_bloom_filter(), + expected_row_group_pruned_sbbf + ); + assert_eq!( + output.row_groups_pruned_statistics(), + expected_row_group_pruned_statistics + ); + assert_eq!( + output.result_rows, + expected_results, + "{}", + output.description() + ); +} + #[tokio::test] async fn prune_timestamps_nanos() { test_prune( @@ -336,16 +369,38 @@ async fn prune_int32_eq_in_list() { #[tokio::test] async fn prune_int32_eq_in_list_2() { // result of sql "SELECT * FROM t where in (1000)", prune all - test_prune( + // test whether statistics works + test_prune_verbose( Scenario::Int32, "SELECT * FROM t where i in (1000)", Some(0), + Some(0), Some(4), 0, ) .await; } +#[tokio::test] +async fn prune_int32_eq_large_in_list() { + // result of sql "SELECT * FROM t where i in (2050...2582)", prune all + // test whether sbbf works + test_prune_verbose( + Scenario::Int32Range, + format!( + "SELECT * FROM t where i in ({})", + (200050..200082).join(",") + ) + .as_str(), + Some(0), + Some(1), + // we don't support pruning by statistics for in_list with more than 20 elements currently + Some(0), + 0, + ) + .await; +} + #[tokio::test] async fn prune_int32_eq_in_list_negated() { // result of sql "SELECT * FROM t where not in (1)" prune nothing diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 37f8cefc9080..a1d9a02cf6b1 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -738,7 +738,8 @@ async fn parquet_explain_analyze() { // should contain aggregated stats assert_contains!(&formatted, "output_rows=8"); - assert_contains!(&formatted, "row_groups_pruned=0"); + assert_contains!(&formatted, "row_groups_pruned_bloom_filter=0"); + assert_contains!(&formatted, "row_groups_pruned_statistics=0"); } #[tokio::test] @@ -754,7 +755,8 @@ async fn parquet_explain_analyze_verbose() { .to_string(); // should contain the raw per file stats (with the label) - assert_contains!(&formatted, "row_groups_pruned{partition=0"); + assert_contains!(&formatted, "row_groups_pruned_bloom_filter{partition=0"); + assert_contains!(&formatted, "row_groups_pruned_statistics{partition=0"); } #[tokio::test] diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs index 0aee2af67fdd..84b5a0ff9cb7 100644 --- a/datafusion/physical-expr/src/utils/guarantee.rs +++ b/datafusion/physical-expr/src/utils/guarantee.rs @@ -23,6 +23,7 @@ use crate::{split_conjunction, PhysicalExpr}; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::Operator; use std::collections::{HashMap, HashSet}; +use std::fmt::{self, Display, Formatter}; use std::sync::Arc; /// Represents a guarantee that must be true for a boolean expression to @@ -222,6 +223,33 @@ impl LiteralGuarantee { } } +impl Display for LiteralGuarantee { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self.guarantee { + Guarantee::In => write!( + f, + "{} in ({})", + self.column.name, + self.literals + .iter() + .map(|lit| lit.to_string()) + .collect::>() + .join(", ") + ), + Guarantee::NotIn => write!( + f, + "{} not in ({})", + self.column.name, + self.literals + .iter() + .map(|lit| lit.to_string()) + .collect::>() + .join(", ") + ), + } + } +} + /// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s, /// preserving insert order #[derive(Debug, Default)] @@ -398,6 +426,7 @@ mod test { use datafusion_common::ToDFSchema; use datafusion_expr::expr_fn::*; use datafusion_expr::{lit, Expr}; + use itertools::Itertools; use std::sync::OnceLock; #[test] @@ -691,6 +720,11 @@ mod test { col("b").in_list(vec![lit(1), lit(2), lit(3)], true), vec![not_in_guarantee("b", [1, 2, 3])], ); + // b IN (1,2,3,4...24) + test_analyze( + col("b").in_list((1..25).map(lit).collect_vec(), false), + vec![in_guarantee("b", 1..25)], + ); } #[test] diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 9d4951c7ecac..73487635e9cb 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42) physical_plan CoalesceBatchesExec: target_batch_size=8192 --FilterExec: column1@0 != 42 -----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 +----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)] # disable round robin repartitioning statement ok @@ -77,7 +77,7 @@ Filter: parquet_table.column1 != Int32(42) physical_plan CoalesceBatchesExec: target_batch_size=8192 --FilterExec: column1@0 != 42 -----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 +----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)] # enable round robin repartitioning again statement ok @@ -102,7 +102,7 @@ SortPreservingMergeExec: [column1@0 ASC NULLS LAST] --SortExec: expr=[column1@0 ASC NULLS LAST] ----CoalesceBatchesExec: target_batch_size=8192 ------FilterExec: column1@0 != 42 ---------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 +--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)] ## Read the files as though they are ordered @@ -138,7 +138,7 @@ physical_plan SortPreservingMergeExec: [column1@0 ASC NULLS LAST] --CoalesceBatchesExec: target_batch_size=8192 ----FilterExec: column1@0 != 42 -------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 +------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)] # Cleanup statement ok