Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add bloom filter metric to ParquetExec #8772

Merged
merged 8 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::physical_plan::metrics::{
pub struct ParquetFileMetrics {
/// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: Count,
/// Number of row groups pruned using
pub row_groups_pruned: Count,
/// Number of row groups pruned by bloom filters
pub row_groups_pruned_bloom_filter: Count,
/// Number of row groups pruned by statistics
pub row_groups_pruned_statistics: Count,
/// Total number of bytes scanned
pub bytes_scanned: Count,
/// Total rows filtered out by predicates pushed into parquet scan
Expand All @@ -54,9 +56,13 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("predicate_evaluation_errors", partition);

let row_groups_pruned = MetricBuilder::new(metrics)
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("row_groups_pruned", partition);
.counter("row_groups_pruned_bloom_filter", partition);

let row_groups_pruned_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("row_groups_pruned_statistics", partition);

let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
Expand All @@ -79,7 +85,8 @@ impl ParquetFileMetrics {

Self {
predicate_evaluation_errors,
row_groups_pruned,
row_groups_pruned_bloom_filter,
row_groups_pruned_statistics,
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_physical_expr::{
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use object_store::path::Path;
use object_store::ObjectStore;
Expand Down Expand Up @@ -278,7 +279,17 @@ impl DisplayAs for ParquetExec {
let pruning_predicate_string = self
.pruning_predicate
.as_ref()
.map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
.map(|pre| {
format!(
", pruning_predicate={}, required_guarantees=[{}]",
pre.predicate_expr(),
pre.literal_guarantees()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a neat idea to display guarantees 👍

.iter()
.map(|item| format!("{}", item))
.collect_vec()
.join(", ")
)
})
.unwrap_or_default();

write!(f, "ParquetExec: ")?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
Ok(values) => {
// NB: false means don't scan row group
if !values[0] {
metrics.row_groups_pruned.add(1);
metrics.row_groups_pruned_statistics.add(1);
continue;
}
}
Expand Down Expand Up @@ -159,7 +159,7 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
};

if prune_group {
metrics.row_groups_pruned.add(1);
metrics.row_groups_pruned_bloom_filter.add(1);
} else {
filtered.push(*idx);
}
Expand Down Expand Up @@ -1049,12 +1049,9 @@ mod tests {
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);

let expr = col(r#""String""#).in_list(
vec![
lit("Hello_Not_Exists"),
lit("Hello_Not_Exists2"),
lit("Hello_Not_Exists3"),
lit("Hello_Not_Exist4"),
],
(1..25)
.map(|i| lit(format!("Hello_Not_Exists{}", i)))
.collect::<Vec<_>>(),
false,
);
let expr = logical2physical(&expr, &schema);
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ impl PruningPredicate {
&self.predicate_expr
}

/// Returns a reference to the literal guarantees
pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
&self.literal_guarantees
}

/// Returns true if this pruning predicate can not prune anything.
///
/// This happens if the predicate is a literal `true` and
Expand Down
36 changes: 32 additions & 4 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ enum Scenario {
Timestamps,
Dates,
Int32,
Int32Range,
Float64,
Decimal,
DecimalLargePrecision,
Expand Down Expand Up @@ -113,12 +114,24 @@ impl TestOutput {
self.metric_value("predicate_evaluation_errors")
}

/// The number of times the pruning predicate evaluation errors
/// The number of row_groups pruned by bloom filter
fn row_groups_pruned_bloom_filter(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_bloom_filter")
}

/// The number of row_groups pruned by statistics
fn row_groups_pruned_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_statistics")
}

/// The number of row_groups pruned
fn row_groups_pruned(&self) -> Option<usize> {
self.metric_value("row_groups_pruned")
self.row_groups_pruned_bloom_filter()
.zip(self.row_groups_pruned_statistics())
.map(|(a, b)| a + b)
}

/// The number of times the pruning predicate evaluation errors
/// The number of row pages pruned
fn row_pages_pruned(&self) -> Option<usize> {
self.metric_value("page_index_rows_filtered")
}
Expand All @@ -145,7 +158,11 @@ impl ContextWithParquet {
mut config: SessionConfig,
) -> Self {
let file = match unit {
Unit::RowGroup => make_test_file_rg(scenario).await,
Unit::RowGroup => {
let config = config.options_mut();
config.execution.parquet.bloom_filter_enabled = true;
make_test_file_rg(scenario).await
}
Unit::Page => {
let config = config.options_mut();
config.execution.parquet.enable_page_index = true;
Expand Down Expand Up @@ -360,6 +377,13 @@ fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_int32_range(start: i32, end: i32) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let v = vec![start, end];
let array = Arc::new(Int32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

/// Return record batch with f64 vector
///
/// Columns are named
Expand Down Expand Up @@ -508,6 +532,9 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_int32_batch(5, 10),
]
}
Scenario::Int32Range => {
vec![make_int32_range(0, 10), make_int32_range(200000, 300000)]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down Expand Up @@ -565,6 +592,7 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile {

let props = WriterProperties::builder()
.set_max_row_group_size(5)
.set_bloom_filter_enabled(true)
.build();

let batches = create_data_batch(scenario);
Expand Down
57 changes: 56 additions & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! expected.
use datafusion::prelude::SessionConfig;
use datafusion_common::ScalarValue;
use itertools::Itertools;

use crate::parquet::Unit::RowGroup;
use crate::parquet::{ContextWithParquet, Scenario};
Expand Down Expand Up @@ -48,6 +49,38 @@ async fn test_prune(
);
}

/// check row group pruning by bloom filter and statistics independently
async fn test_prune_verbose(
case_data_type: Scenario,
sql: &str,
expected_errors: Option<usize>,
expected_row_group_pruned_sbbf: Option<usize>,
expected_row_group_pruned_statistics: Option<usize>,
expected_results: usize,
) {
let output = ContextWithParquet::new(case_data_type, RowGroup)
.await
.query(sql)
.await;

println!("{}", output.description());
assert_eq!(output.predicate_evaluation_errors(), expected_errors);
assert_eq!(
output.row_groups_pruned_bloom_filter(),
expected_row_group_pruned_sbbf
);
assert_eq!(
output.row_groups_pruned_statistics(),
expected_row_group_pruned_statistics
);
assert_eq!(
output.result_rows,
expected_results,
"{}",
output.description()
);
}

#[tokio::test]
async fn prune_timestamps_nanos() {
test_prune(
Expand Down Expand Up @@ -336,16 +369,38 @@ async fn prune_int32_eq_in_list() {
#[tokio::test]
async fn prune_int32_eq_in_list_2() {
// result of sql "SELECT * FROM t where in (1000)", prune all
test_prune(
// test whether statistics works
test_prune_verbose(
Scenario::Int32,
"SELECT * FROM t where i in (1000)",
Some(0),
Some(0),
Some(4),
0,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq_large_in_list() {
// result of sql "SELECT * FROM t where i in (2050...2582)", prune all
// test whether sbbf works
test_prune_verbose(
Scenario::Int32Range,
format!(
"SELECT * FROM t where i in ({})",
(200050..200082).join(",")
)
.as_str(),
Some(0),
Some(1),
// we don't support pruning by statistics for in_list with more than 20 elements currently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think it the pruning by bloom filters happens first, so it may not even try to prune by statistics

👍

Some(0),
0,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq_in_list_negated() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ async fn parquet_explain_analyze() {

// should contain aggregated stats
assert_contains!(&formatted, "output_rows=8");
assert_contains!(&formatted, "row_groups_pruned=0");
assert_contains!(&formatted, "row_groups_pruned_bloom_filter=0");
assert_contains!(&formatted, "row_groups_pruned_statistics=0");
}

#[tokio::test]
Expand All @@ -754,7 +755,8 @@ async fn parquet_explain_analyze_verbose() {
.to_string();

// should contain the raw per file stats (with the label)
assert_contains!(&formatted, "row_groups_pruned{partition=0");
assert_contains!(&formatted, "row_groups_pruned_bloom_filter{partition=0");
assert_contains!(&formatted, "row_groups_pruned_statistics{partition=0");
}

#[tokio::test]
Expand Down
34 changes: 34 additions & 0 deletions datafusion/physical-expr/src/utils/guarantee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{split_conjunction, PhysicalExpr};
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::Operator;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;

/// Represents a guarantee that must be true for a boolean expression to
Expand Down Expand Up @@ -222,6 +223,33 @@ impl LiteralGuarantee {
}
}

impl Display for LiteralGuarantee {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self.guarantee {
Guarantee::In => write!(
f,
"{} in ({})",
self.column.name,
self.literals
.iter()
.map(|lit| lit.to_string())
.collect::<Vec<_>>()
.join(", ")
),
Guarantee::NotIn => write!(
f,
"{} not in ({})",
self.column.name,
self.literals
.iter()
.map(|lit| lit.to_string())
.collect::<Vec<_>>()
.join(", ")
),
}
}
}

/// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s,
/// preserving insert order
#[derive(Debug, Default)]
Expand Down Expand Up @@ -398,6 +426,7 @@ mod test {
use datafusion_common::ToDFSchema;
use datafusion_expr::expr_fn::*;
use datafusion_expr::{lit, Expr};
use itertools::Itertools;
use std::sync::OnceLock;

#[test]
Expand Down Expand Up @@ -691,6 +720,11 @@ mod test {
col("b").in_list(vec![lit(1), lit(2), lit(3)], true),
vec![not_in_guarantee("b", [1, 2, 3])],
);
// b IN (1,2,3,4...24)
test_analyze(
col("b").in_list((1..25).map(lit).collect_vec(), false),
vec![in_guarantee("b", 1..25)],
);
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]

# disable round robin repartitioning
statement ok
Expand All @@ -77,7 +77,7 @@ Filter: parquet_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]

# enable round robin repartitioning again
statement ok
Expand All @@ -102,7 +102,7 @@ SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--SortExec: expr=[column1@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: column1@0 != 42
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]


## Read the files as though they are ordered
Expand Down Expand Up @@ -138,7 +138,7 @@ physical_plan
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: column1@0 != 42
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]

# Cleanup
statement ok
Expand Down