diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 77a909731d89..610784f91dec 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -136,6 +136,7 @@ ctor = { workspace = true } doc-comment = { workspace = true } env_logger = { workspace = true } half = { workspace = true, default-features = true } +paste = "^1.0" postgres-protocol = "0.6.4" postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] } rand = { workspace = true, features = ["small_rng"] } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 3fe51288e79a..368637d024e6 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -20,8 +20,9 @@ use arrow::array::Decimal128Array; use arrow::{ array::{ Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray, - Float64Array, Int32Array, StringArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, }, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, @@ -62,7 +63,7 @@ fn init() { enum Scenario { Timestamps, Dates, - Int32, + Int, Int32Range, Float64, Decimal, @@ -389,12 +390,31 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch { /// Return record batch with i32 sequence /// /// Columns are named -/// "i" -> Int32Array -fn make_int32_batch(start: i32, end: i32) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); - let v: Vec = (start..end).collect(); - let array = Arc::new(Int32Array::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +/// "i8" -> Int8Array +/// "i16" -> Int16Array +/// "i32" -> Int32Array +/// "i64" -> Int64Array +fn make_int_batches(start: i8, end: i8) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("i8", DataType::Int8, true), + Field::new("i16", DataType::Int16, true), + Field::new("i32", DataType::Int32, true), + Field::new("i64", DataType::Int64, true), + ])); + let v8: Vec = (start..end).collect(); + let v16: Vec = (start as _..end as _).collect(); + let v32: Vec = (start as _..end as _).collect(); + let v64: Vec = (start as _..end as _).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int8Array::from(v8)) as ArrayRef, + Arc::new(Int16Array::from(v16)) as ArrayRef, + Arc::new(Int32Array::from(v32)) as ArrayRef, + Arc::new(Int64Array::from(v64)) as ArrayRef, + ], + ) + .unwrap() } fn make_int32_range(start: i32, end: i32) -> RecordBatch { @@ -589,12 +609,12 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_date_batch(TimeDelta::try_days(3600).unwrap()), ] } - Scenario::Int32 => { + Scenario::Int => { vec![ - make_int32_batch(-5, 0), - make_int32_batch(-4, 1), - make_int32_batch(0, 5), - make_int32_batch(5, 10), + make_int_batches(-5, 0), + make_int_batches(-4, 1), + make_int_batches(0, 5), + make_int_batches(5, 10), ] } Scenario::Int32Range => { diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 3a43428f5bcf..e9e99cd3f88e 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -371,112 +371,149 @@ async fn prune_date64() { assert_eq!(output.result_rows, 1, "{}", output.description()); } -#[tokio::test] -// null count min max -// page-0 0 -5 -1 -// page-1 0 -4 0 -// page-2 0 0 4 -// page-3 0 5 9 -async fn prune_int32_lt() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where i < 1", - Some(0), - Some(5), - 11, - ) - .await; - // result of sql "SELECT * FROM t where i < 1" is same as - // "SELECT * FROM t where -i > -1" - test_prune( - Scenario::Int32, - "SELECT * FROM t where -i > -1", - Some(0), - Some(5), - 11, - ) - .await; -} - -#[tokio::test] -async fn prune_int32_gt() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where i > 8", - Some(0), - Some(15), - 1, - ) - .await; - - test_prune( - Scenario::Int32, - "SELECT * FROM t where -i < -8", - Some(0), - Some(15), - 1, - ) - .await; -} - -#[tokio::test] -async fn prune_int32_eq() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where i = 1", - Some(0), - Some(15), - 1, - ) - .await; -} -#[tokio::test] -async fn prune_int32_scalar_fun_and_eq() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where abs(i) = 1 and i = 1", - Some(0), - Some(15), - 1, - ) - .await; +macro_rules! int_tests { + ($bits:expr) => { + paste::item! { + #[tokio::test] + // null count min max + // page-0 0 -5 -1 + // page-1 0 -4 0 + // page-2 0 0 4 + // page-3 0 5 9 + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where i{} < 1", $bits), + Some(0), + Some(5), + 11, + ) + .await; + // result of sql "SELECT * FROM t where i < 1" is same as + // "SELECT * FROM t where -i > -1" + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where -i{} > -1", $bits), + Some(0), + Some(5), + 11, + ) + .await; + } + + #[tokio::test] + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where i{} > 8", $bits), + Some(0), + Some(15), + 1, + ) + .await; + + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where -i{} < -8", $bits), + Some(0), + Some(15), + 1, + ) + .await; + } + + #[tokio::test] + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where i{} = 1", $bits), + Some(0), + Some(15), + 1, + ) + .await; + } + #[tokio::test] + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits), + Some(0), + Some(15), + 1, + ) + .await; + } + + #[tokio::test] + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where abs(i{}) = 1", $bits), + Some(0), + Some(0), + 3, + ) + .await; + } + + #[tokio::test] + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where i{}+1 = 1", $bits), + Some(0), + Some(0), + 2, + ) + .await; + } + + #[tokio::test] + async fn []() { + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where 1-i{} > 1", $bits), + Some(0), + Some(0), + 9, + ) + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1)" + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where i{} in (1)", $bits), + Some(0), + Some(15), + 1, + ) + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where not in (1)" prune nothing + test_prune( + Scenario::Int, + &format!("SELECT * FROM t where i{} not in (1)", $bits), + Some(0), + Some(0), + 19, + ) + .await; + } + } + } } -#[tokio::test] -async fn prune_int32_scalar_fun() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where abs(i) = 1", - Some(0), - Some(0), - 3, - ) - .await; -} - -#[tokio::test] -async fn prune_int32_complex_expr() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where i+1 = 1", - Some(0), - Some(0), - 2, - ) - .await; -} - -#[tokio::test] -async fn prune_int32_complex_expr_subtract() { - test_prune( - Scenario::Int32, - "SELECT * FROM t where 1-i > 1", - Some(0), - Some(0), - 9, - ) - .await; -} +int_tests!(8); +int_tests!(16); +int_tests!(32); +int_tests!(64); #[tokio::test] // null count min max @@ -556,37 +593,6 @@ async fn prune_f64_complex_expr_subtract() { .await; } -#[tokio::test] -// null count min max -// page-0 0 -5 -1 -// page-1 0 -4 0 -// page-2 0 0 4 -// page-3 0 5 9 -async fn prune_int32_eq_in_list() { - // result of sql "SELECT * FROM t where in (1)" - test_prune( - Scenario::Int32, - "SELECT * FROM t where i in (1)", - Some(0), - Some(15), - 1, - ) - .await; -} - -#[tokio::test] -async fn prune_int32_eq_in_list_negated() { - // result of sql "SELECT * FROM t where not in (1)" prune nothing - test_prune( - Scenario::Int32, - "SELECT * FROM t where i not in (1)", - Some(0), - Some(0), - 19, - ) - .await; -} - #[tokio::test] async fn prune_decimal_lt() { // The data type of decimal_col is decimal(9,2) diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index ed48d040648c..b70102f78a96 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -285,105 +285,191 @@ async fn prune_disabled() { ); } -#[tokio::test] -async fn prune_int32_lt() { - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i < 1") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(3)) - .with_pruned_by_stats(Some(1)) - .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(11) - .test_row_group_prune() - .await; - - // result of sql "SELECT * FROM t where i < 1" is same as - // "SELECT * FROM t where -i > -1" - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where -i > -1") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(3)) - .with_pruned_by_stats(Some(1)) - .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(11) - .test_row_group_prune() - .await; -} - -#[tokio::test] -async fn prune_int32_eq() { - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i = 1") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(1)) - .with_pruned_by_stats(Some(3)) - .with_matched_by_bloom_filter(Some(1)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(1) - .test_row_group_prune() - .await; -} -#[tokio::test] -async fn prune_int32_scalar_fun_and_eq() { - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i = 1") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(1)) - .with_pruned_by_stats(Some(3)) - .with_matched_by_bloom_filter(Some(1)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(1) - .test_row_group_prune() - .await; -} - -#[tokio::test] -async fn prune_int32_scalar_fun() { - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where abs(i) = 1") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(0)) - .with_pruned_by_stats(Some(0)) - .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(3) - .test_row_group_prune() - .await; +// $bits: number of bits of the integer to test (8, 16, 32, 64) +// $correct_bloom_filters: if false, replicates the +// https://github.com/apache/arrow-datafusion/issues/9779 bug so that tests pass +// if and only if Bloom filters on Int8 and Int16 columns are still buggy. +macro_rules! int_tests { + ($bits:expr, correct_bloom_filters: $correct_bloom_filters:expr) => { + paste::item! { + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{} < 1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(11) + .test_row_group_prune() + .await; + + // result of sql "SELECT * FROM t where i < 1" is same as + // "SELECT * FROM t where -i > -1" + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where -i{} > -1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(11) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{} = 1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{} = 1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where abs(i{}) = 1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(3) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{}+1 = 1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where 1-i{} > 1", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(9) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1)" + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{} in (1)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1000)", prune all + // test whether statistics works + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{} in (100)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(4)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(0) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where not in (1)" prune nothing + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(&format!("SELECT * FROM t where i{} not in (1)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(4)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(4)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(19) + .test_row_group_prune() + .await; + } + } + }; } -#[tokio::test] -async fn prune_int32_complex_expr() { - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i+1 = 1") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(0)) - .with_pruned_by_stats(Some(0)) - .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(2) - .test_row_group_prune() - .await; -} +int_tests!(8, correct_bloom_filters: false); +int_tests!(16, correct_bloom_filters: false); +int_tests!(32, correct_bloom_filters: true); +int_tests!(64, correct_bloom_filters: true); #[tokio::test] -async fn prune_int32_complex_expr_subtract() { +async fn prune_int32_eq_large_in_list() { + // result of sql "SELECT * FROM t where i in (2050...2582)", prune all RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where 1-i > 1") + .with_scenario(Scenario::Int32Range) + .with_query( + format!( + "SELECT * FROM t where i in ({})", + (200050..200082).join(",") + ) + .as_str(), + ) .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(0)) + .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(0)) .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(9) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(0) .test_row_group_prune() .await; } @@ -479,77 +565,6 @@ async fn prune_f64_complex_expr_subtract() { .await; } -#[tokio::test] -async fn prune_int32_eq_in_list() { - // result of sql "SELECT * FROM t where in (1)" - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i in (1)") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(1)) - .with_pruned_by_stats(Some(3)) - .with_matched_by_bloom_filter(Some(1)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(1) - .test_row_group_prune() - .await; -} - -#[tokio::test] -async fn prune_int32_eq_in_list_2() { - // result of sql "SELECT * FROM t where in (1000)", prune all - // test whether statistics works - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i in (1000)") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(0)) - .with_pruned_by_stats(Some(4)) - .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(0) - .test_row_group_prune() - .await; -} - -#[tokio::test] -async fn prune_int32_eq_large_in_list() { - // result of sql "SELECT * FROM t where i in (2050...2582)", prune all - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32Range) - .with_query( - format!( - "SELECT * FROM t where i in ({})", - (200050..200082).join(",") - ) - .as_str(), - ) - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(1)) - .with_pruned_by_stats(Some(0)) - .with_matched_by_bloom_filter(Some(0)) - .with_pruned_by_bloom_filter(Some(1)) - .with_expected_rows(0) - .test_row_group_prune() - .await; -} - -#[tokio::test] -async fn prune_int32_eq_in_list_negated() { - // result of sql "SELECT * FROM t where not in (1)" prune nothing - RowGroupPruningTest::new() - .with_scenario(Scenario::Int32) - .with_query("SELECT * FROM t where i not in (1)") - .with_expected_errors(Some(0)) - .with_matched_by_stats(Some(4)) - .with_pruned_by_stats(Some(0)) - .with_matched_by_bloom_filter(Some(4)) - .with_pruned_by_bloom_filter(Some(0)) - .with_expected_rows(19) - .test_row_group_prune() - .await; -} - #[tokio::test] async fn prune_decimal_lt() { // The data type of decimal_col is decimal(9,2)