From 9139183b19ca29e0557bc32085396e25f046feb1 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Mon, 28 Nov 2022 23:01:58 +0800 Subject: [PATCH] Fix page index pruning fail on complex_expr (#4387) Signed-off-by: yangjiang Signed-off-by: yangjiang --- .../file_format/parquet/page_filter.rs | 70 ++++++++++--------- datafusion/core/tests/parquet/page_pruning.rs | 10 +-- 2 files changed, 38 insertions(+), 42 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index f586f3d235bd..eb5bb261667e 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -126,46 +126,50 @@ pub(crate) fn build_page_filter( let mut row_selections = VecDeque::with_capacity(page_index_predicates.len()); for predicate in page_index_predicates { // `extract_page_index_push_down_predicates` only return predicate with one col. - let col_id = *predicate.need_input_columns_ids().iter().next().unwrap(); - let mut selectors = Vec::with_capacity(row_groups.len()); - for r in row_groups.iter() { - let rg_offset_indexes = file_offset_indexes.get(*r); - let rg_page_indexes = file_page_indexes.get(*r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes)) = - (rg_page_indexes, rg_offset_indexes) - { - selectors.extend( - prune_pages_in_one_row_group( - &groups[*r], - &predicate, - rg_offset_indexes.get(col_id), - rg_page_indexes.get(col_id), - groups[*r].column(col_id).column_descr(), - file_metrics, - ) - .map_err(|e| { - ArrowError::ParquetError(format!( - "Fail in prune_pages_in_one_row_group: {}", - e - )) - }), - ); - } else { - trace!( + // when building `PruningPredicate`, some single column filter like `abs(i) = 1` + // will be rewrite to `lit(true)`, so may have an empty required_columns. + if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() { + let mut selectors = Vec::with_capacity(row_groups.len()); + for r in row_groups.iter() { + let rg_offset_indexes = file_offset_indexes.get(*r); + let rg_page_indexes = file_page_indexes.get(*r); + if let (Some(rg_page_indexes), Some(rg_offset_indexes)) = + (rg_page_indexes, rg_offset_indexes) + { + selectors.extend( + prune_pages_in_one_row_group( + &groups[*r], + &predicate, + rg_offset_indexes.get(col_id), + rg_page_indexes.get(col_id), + groups[*r].column(col_id).column_descr(), + file_metrics, + ) + .map_err(|e| { + ArrowError::ParquetError(format!( + "Fail in prune_pages_in_one_row_group: {}", + e + )) + }), + ); + } else { + trace!( "Did not have enough metadata to prune with page indexes, falling back, falling back to all rows", ); - // fallback select all rows - let all_selected = - vec![RowSelector::select(groups[*r].num_rows() as usize)]; - selectors.push(all_selected); + // fallback select all rows + let all_selected = + vec![RowSelector::select(groups[*r].num_rows() as usize)]; + selectors.push(all_selected); + } } - } - debug!( + debug!( "Use filter and page index create RowSelection {:?} from predicate: {:?}", &selectors, predicate.predicate_expr(), ); - row_selections.push_back(selectors.into_iter().flatten().collect::>()); + row_selections + .push_back(selectors.into_iter().flatten().collect::>()); + } } let final_selection = combine_multi_col_selection(row_selections); let total_skip = diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 8c67bc346d12..788082e37ce1 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -418,7 +418,6 @@ async fn prune_int32_eq() { .await; } #[tokio::test] -#[ignore] async fn prune_int32_scalar_fun_and_eq() { test_prune( Scenario::Int32, @@ -431,7 +430,6 @@ async fn prune_int32_scalar_fun_and_eq() { } #[tokio::test] -#[ignore] async fn prune_int32_scalar_fun() { test_prune( Scenario::Int32, @@ -444,7 +442,6 @@ async fn prune_int32_scalar_fun() { } #[tokio::test] -#[ignore] async fn prune_int32_complex_expr() { test_prune( Scenario::Int32, @@ -457,7 +454,6 @@ async fn prune_int32_complex_expr() { } #[tokio::test] -#[ignore] async fn prune_int32_complex_expr_subtract() { test_prune( Scenario::Int32, @@ -495,7 +491,6 @@ async fn prune_f64_lt() { } #[tokio::test] -#[ignore] async fn prune_f64_scalar_fun_and_gt() { // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1" // only use "f >= 0" to prune @@ -503,14 +498,13 @@ async fn prune_f64_scalar_fun_and_gt() { Scenario::Float64, "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1", Some(0), - Some(2), + Some(10), 1, ) .await; } #[tokio::test] -#[ignore] async fn prune_f64_scalar_fun() { // result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported test_prune( @@ -524,7 +518,6 @@ async fn prune_f64_scalar_fun() { } #[tokio::test] -#[ignore] async fn prune_f64_complex_expr() { // result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported test_prune( @@ -538,7 +531,6 @@ async fn prune_f64_complex_expr() { } #[tokio::test] -#[ignore] async fn prune_f64_complex_expr_subtract() { // result of sql "SELECT * FROM t where 1-f > 1" is not supported test_prune(