Skip to content

Commit

Permalink
Fix page index pruning fail on complex_expr (#4387)
Browse files Browse the repository at this point in the history
Signed-off-by: yangjiang <[email protected]>

Signed-off-by: yangjiang <[email protected]>
  • Loading branch information
Ted-Jiang authored Nov 28, 2022
1 parent da55f93 commit 9139183
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,46 +126,50 @@ pub(crate) fn build_page_filter(
let mut row_selections = VecDeque::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
let col_id = *predicate.need_input_columns_ids().iter().next().unwrap();
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {}",
e
))
}),
);
} else {
trace!(
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {}",
e
))
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
}
debug!(
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
row_selections
.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
Expand Down
10 changes: 1 addition & 9 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ async fn prune_int32_eq() {
.await;
}
#[tokio::test]
#[ignore]
async fn prune_int32_scalar_fun_and_eq() {
test_prune(
Scenario::Int32,
Expand All @@ -431,7 +430,6 @@ async fn prune_int32_scalar_fun_and_eq() {
}

#[tokio::test]
#[ignore]
async fn prune_int32_scalar_fun() {
test_prune(
Scenario::Int32,
Expand All @@ -444,7 +442,6 @@ async fn prune_int32_scalar_fun() {
}

#[tokio::test]
#[ignore]
async fn prune_int32_complex_expr() {
test_prune(
Scenario::Int32,
Expand All @@ -457,7 +454,6 @@ async fn prune_int32_complex_expr() {
}

#[tokio::test]
#[ignore]
async fn prune_int32_complex_expr_subtract() {
test_prune(
Scenario::Int32,
Expand Down Expand Up @@ -495,22 +491,20 @@ async fn prune_f64_lt() {
}

#[tokio::test]
#[ignore]
async fn prune_f64_scalar_fun_and_gt() {
// result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
// only use "f >= 0" to prune
test_prune(
Scenario::Float64,
"SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1",
Some(0),
Some(2),
Some(10),
1,
)
.await;
}

#[tokio::test]
#[ignore]
async fn prune_f64_scalar_fun() {
// result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported
test_prune(
Expand All @@ -524,7 +518,6 @@ async fn prune_f64_scalar_fun() {
}

#[tokio::test]
#[ignore]
async fn prune_f64_complex_expr() {
// result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported
test_prune(
Expand All @@ -538,7 +531,6 @@ async fn prune_f64_complex_expr() {
}

#[tokio::test]
#[ignore]
async fn prune_f64_complex_expr_subtract() {
// result of sql "SELECT * FROM t where 1-f > 1" is not supported
test_prune(
Expand Down

0 comments on commit 9139183

Please sign in to comment.