diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 7573a263bee9c..aa74437dac699 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -18,6 +18,7 @@ //! Execution plan for reading Parquet files use fmt::Debug; +use std::collections::{HashSet, VecDeque}; use std::fmt; use std::fs; use std::ops::Range; @@ -56,12 +57,14 @@ use arrow::{ }; use bytes::Bytes; use datafusion_common::Column; +use datafusion_expr::utils::expr_to_columns; use datafusion_expr::Expr; +use datafusion_optimizer::utils::split_conjunction; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use log::debug; +use log::{debug, error}; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelector}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; @@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener { &file_metrics, ); - if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) { - let file_offset_indexes = file_metadata.offset_indexes(); - let file_page_indexes = file_metadata.page_indexes(); - if let (Some(file_offset_indexes), Some(file_page_indexes)) = - (file_offset_indexes, file_page_indexes) - { - let mut selectors = Vec::with_capacity(row_groups.len()); - for r in &row_groups { - selectors.extend( - prune_pages_in_one_row_group( - &groups[*r], - pruning_predicate.clone(), - file_offset_indexes.get(*r), - file_page_indexes.get(*r), - &file_metrics, - ) - .map_err(|e| { - ArrowError::ParquetError(format!( - "Fail in prune_pages_in_one_row_group: {}", - e - )) - }), + if enable_page_index { + let page_index_predicates = extract_page_index_push_down_predicates( + &pruning_predicate, + builder.schema().clone(), + )?; + if !page_index_predicates.is_empty() { + let file_offset_indexes = file_metadata.offset_indexes(); + let file_page_indexes = file_metadata.page_indexes(); + if let (Some(file_offset_indexes), Some(file_page_indexes)) = + (file_offset_indexes, file_page_indexes) + { + let mut row_selections = + VecDeque::with_capacity(page_index_predicates.len()); + for predicate in page_index_predicates { + // `extract_page_index_push_down_predicates` only return predicate with one col. + let col_id = *predicate + .need_input_columns_ids() + .iter() + .next() + .unwrap(); + let mut selectors = Vec::with_capacity(row_groups.len()); + for r in &row_groups { + let rg_offset_indexes = file_offset_indexes.get(*r); + let rg_page_indexes = file_page_indexes.get(*r); + if let (Some(rg_page_indexes), Some(rg_offset_indexes)) = + (rg_page_indexes, rg_offset_indexes) + { + selectors.extend( + prune_pages_in_one_row_group( + &groups[*r], + &predicate, + rg_offset_indexes.get(col_id), + rg_page_indexes.get(col_id), + &file_metrics, + ) + .map_err(|e| { + ArrowError::ParquetError(format!( + "Fail in prune_pages_in_one_row_group: {}", + e + )) + }), + ); + } else { + // fallback select all rows + let all_selected = vec![RowSelector::select( + groups[*r].num_rows() as usize, + )]; + selectors.push(all_selected); + } + } + debug!( + "Use filter and page index create RowSelection {:?} from predicate:{:?}", + &selectors, predicate ); + row_selections.push_back( + selectors.into_iter().flatten().collect::>(), + ); + } + let final_selection = combine_multi_col_selection(row_selections); + builder = builder.with_row_selection(final_selection.into()); } - debug!( - "Use filter and page index create RowSelection {:?} ", - &selectors - ); - builder = builder.with_row_selection(RowSelection::from( - selectors.into_iter().flatten().collect::>(), - )); } } @@ -552,18 +585,164 @@ impl FileOpener for ParquetOpener { } } -// Check PruningPredicates just work on one column. -fn check_page_index_push_down_valid(predicate: &Option) -> bool { - if let Some(predicate) = predicate { - // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get - // `num_containers` from `PruningStatistics`. - let cols = predicate.need_input_columns_ids(); - //Todo more specific rules - if cols.len() == 1 { - return true; +// For example: +// > ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ +// > ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ +// > ┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃ +// > ┃ │ │ │ │ │ │ ┃ +// > ┃ │ │ │ │ Page │ │ +// > │ │ │ │ │ 3 │ ┃ +// > ┃ │ │ │ │ min: "A" │ │ ┃ +// > ┃ │ │ │ │ │ max: "C" │ ┃ +// > ┃ │ Page │ │ │ first_row: 0 │ │ +// > │ │ 1 │ │ │ │ ┃ +// > ┃ │ min: 10 │ │ └──────────────┘ │ ┃ +// > ┃ │ │ max: 20 │ │ ┌──────────────┐ ┃ +// > ┃ │ first_row: 0 │ │ │ │ │ +// > │ │ │ │ │ Page │ ┃ +// > ┃ │ │ │ │ 4 │ │ ┃ +// > ┃ │ │ │ │ │ min: "D" │ ┃ +// > ┃ │ │ │ │ max: "G" │ │ +// > │ │ │ │ │first_row: 100│ ┃ +// > ┃ └──────────────┘ │ │ │ │ ┃ +// > ┃ │ ┌──────────────┐ │ │ │ ┃ +// > ┃ │ │ │ └──────────────┘ │ +// > │ │ Page │ │ ┌──────────────┐ ┃ +// > ┃ │ 2 │ │ │ │ │ ┃ +// > ┃ │ │ min: 30 │ │ │ Page │ ┃ +// > ┃ │ max: 40 │ │ │ 5 │ │ +// > │ │first_row: 200│ │ │ min: "H" │ ┃ +// > ┃ │ │ │ │ max: "Z" │ │ ┃ +// > ┃ │ │ │ │ │first_row: 250│ ┃ +// > ┃ └──────────────┘ │ │ │ │ +// > │ │ └──────────────┘ ┃ +// > ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ +// > ┃ ColumnChunk ColumnChunk ┃ +// > ┃ A B +// > ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛ +// > +// > Total rows: 300 +// +// Given the predicate 'A > 35 AND B = "F"': +// using `extract_page_index_push_down_predicates` get two single column predicate: +// Using 'A > 35': could get RowSelector1: [ Skip(0~199), Read(200~299)] +// Using B = "F": could get RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)] +// +// As the Final selection is the intersection of each columns RowSelectors: +// final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)] +fn combine_multi_col_selection( + row_selections: VecDeque>, +) -> Vec { + row_selections + .into_iter() + .reduce(intersect_row_selection) + .unwrap() +} + +// combine two `RowSelection` return the intersection +// For example: +// self: NNYYYYNNY +// other: NYNNNNNNY +// +// returned: NNNNNNNNY +// set `need_combine` true will combine result: Select(2) + Select(1) + Skip(2) -> Select(3) + Skip(2) +pub(crate) fn intersect_row_selection( + left: Vec, + right: Vec, +) -> Vec { + let mut res = vec![]; + let mut l_iter = left.into_iter().peekable(); + let mut r_iter = right.into_iter().peekable(); + + while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) { + if a.row_count == 0 { + l_iter.next().unwrap(); + continue; + } + if b.row_count == 0 { + r_iter.next().unwrap(); + continue; + } + match (a.skip, b.skip) { + // Keep both ranges + (false, false) => { + if a.row_count < b.row_count { + res.push(RowSelector::select(a.row_count)); + b.row_count -= a.row_count; + l_iter.next().unwrap(); + } else { + res.push(RowSelector::select(b.row_count)); + a.row_count -= b.row_count; + r_iter.next().unwrap(); + } + } + // skip at least one + _ => { + if a.row_count < b.row_count { + res.push(RowSelector::skip(a.row_count)); + b.row_count -= a.row_count; + l_iter.next().unwrap(); + } else { + res.push(RowSelector::skip(b.row_count)); + a.row_count -= b.row_count; + r_iter.next().unwrap(); + } + } + } + } + if l_iter.peek().is_some() { + res.extend(l_iter); + } + if r_iter.peek().is_some() { + res.extend(r_iter); + } + // combine the adjacent same operators and last zero row count + // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is released~ + + let mut pre = res[0]; + let mut after_combine = vec![]; + for selector in res.iter_mut().skip(1) { + if selector.skip == pre.skip { + pre.row_count += selector.row_count; + } else { + after_combine.push(pre); + pre = *selector; } } - false + if pre.row_count != 0 { + after_combine.push(pre); + } + after_combine +} + +// Extract single col pruningPredicate from input predicate for evaluating page Index. +fn extract_page_index_push_down_predicates( + predicate: &Option, + schema: SchemaRef, +) -> Result> { + let mut one_col_predicates = vec![]; + if let Some(predicate) = predicate { + let expr = predicate.logical_expr(); + // todo try use CNF rewrite when ready + let predicates = split_conjunction(expr); + let mut one_col_expr = vec![]; + predicates + .into_iter() + .try_for_each::<_, Result<()>>(|predicate| { + let mut columns: HashSet = HashSet::new(); + expr_to_columns(predicate, &mut columns)?; + if columns.len() == 1 { + one_col_expr.push(predicate); + } + Ok(()) + })?; + one_col_predicates = one_col_expr + .into_iter() + .map(|e| PruningPredicate::try_new(e.clone(), schema.clone())) + .collect::>>() + .unwrap_or_default(); + } + Ok(one_col_predicates) } /// Factory of parquet file readers. @@ -723,14 +902,11 @@ struct RowGroupPruningStatistics<'a> { parquet_schema: &'a Schema, } -/// Wraps page_index statistics in a way +/// Wraps one col page_index in one rowGroup statistics in a way /// that implements [`PruningStatistics`] struct PagesPruningStatistics<'a> { - //row_group_metadata: &'a RowGroupMetaData, - page_indexes: &'a Vec, - offset_indexes: &'a Vec>, - parquet_schema: &'a Schema, - col_id: usize, + col_page_indexes: &'a Index, + col_offset_indexes: &'a Vec, } // TODO: consolidate code with arrow-rs @@ -867,55 +1043,43 @@ macro_rules! get_null_count_values { // Extract the min or max value calling `func` from page idex macro_rules! get_min_max_values_for_page_index { - ($self:expr, $column:expr, $func:ident) => {{ - if let Some((col_id_index, _field)) = - $self.parquet_schema.column_with_name(&$column.name) - { - if let Some(page_index) = $self.page_indexes.get(col_id_index) { - match page_index { - Index::NONE => None, - Index::INT32(index) => { - let vec = &index.indexes; - Some(Arc::new(Int32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::INT64(index) => { - let vec = &index.indexes; - Some(Arc::new(Int64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::FLOAT(index) => { - let vec = &index.indexes; - Some(Arc::new(Float32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::DOUBLE(index) => { - let vec = &index.indexes; - Some(Arc::new(Float64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::BOOLEAN(index) => { - let vec = &index.indexes; - Some(Arc::new(BooleanArray::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::INT96(_) - | Index::BYTE_ARRAY(_) - | Index::FIXED_LEN_BYTE_ARRAY(_) => { - //Todo support these type - None - } - } - } else { + ($self:expr, $func:ident) => {{ + match $self.col_page_indexes { + Index::NONE => None, + Index::INT32(index) => { + let vec = &index.indexes; + Some(Arc::new(Int32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::INT64(index) => { + let vec = &index.indexes; + Some(Arc::new(Int64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::FLOAT(index) => { + let vec = &index.indexes; + Some(Arc::new(Float32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::DOUBLE(index) => { + let vec = &index.indexes; + Some(Arc::new(Float64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::BOOLEAN(index) => { + let vec = &index.indexes; + Some(Arc::new(BooleanArray::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + //Todo support these type None } - } else { - None } }}; } @@ -957,52 +1121,40 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } impl<'a> PruningStatistics for PagesPruningStatistics<'a> { - fn min_values(&self, column: &Column) -> Option { - get_min_max_values_for_page_index!(self, column, min) + fn min_values(&self, _column: &Column) -> Option { + get_min_max_values_for_page_index!(self, min) } - fn max_values(&self, column: &Column) -> Option { - get_min_max_values_for_page_index!(self, column, max) + fn max_values(&self, _column: &Column) -> Option { + get_min_max_values_for_page_index!(self, max) } fn num_containers(&self) -> usize { - self.offset_indexes.get(self.col_id).unwrap().len() - } - - fn null_counts(&self, column: &Column) -> Option { - if let Some((col_id_index, _field)) = - self.parquet_schema.column_with_name(&column.name) - { - if let Some(page_index) = self.page_indexes.get(col_id_index) { - match page_index { - Index::NONE => None, - Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT32(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT64(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT96(_) - | Index::BYTE_ARRAY(_) - | Index::FIXED_LEN_BYTE_ARRAY(_) => { - // Todo support these types - None - } - } - } else { + self.col_offset_indexes.len() + } + + fn null_counts(&self, _column: &Column) -> Option { + match self.col_page_indexes { + Index::NONE => None, + Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT32(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT64(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + // Todo support these types None } - } else { - None } } } @@ -1052,72 +1204,57 @@ fn prune_row_groups( fn prune_pages_in_one_row_group( group: &RowGroupMetaData, - predicate: Option, - offset_indexes: Option<&Vec>>, - page_indexes: Option<&Vec>, + predicate: &PruningPredicate, + col_offset_indexes: Option<&Vec>, + col_page_indexes: Option<&Index>, metrics: &ParquetFileMetrics, ) -> Result> { let num_rows = group.num_rows() as usize; - if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) = - (&predicate, offset_indexes, page_indexes) + if let (Some(col_offset_indexes), Some(col_page_indexes)) = + (col_offset_indexes, col_page_indexes) { let pruning_stats = PagesPruningStatistics { - page_indexes, - offset_indexes, - parquet_schema: predicate.schema().as_ref(), - // now we assume only support one col. - col_id: *predicate - .need_input_columns_ids() - .iter() - .take(1) - .next() - .unwrap(), + col_page_indexes, + col_offset_indexes, }; match predicate.prune(&pruning_stats) { Ok(values) => { let mut vec = Vec::with_capacity(values.len()); - if let Some(cols_offset_indexes) = - offset_indexes.get(pruning_stats.col_id) - { - let row_vec = - create_row_count_in_each_page(cols_offset_indexes, num_rows); - assert_eq!(row_vec.len(), values.len()); - let mut sum_row = *row_vec.first().unwrap(); - let mut selected = *values.first().unwrap(); - - for (i, &f) in values.iter().skip(1).enumerate() { - if f == selected { - sum_row += *row_vec.get(i).unwrap(); + let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); + assert_eq!(row_vec.len(), values.len()); + let mut sum_row = *row_vec.first().unwrap(); + let mut selected = *values.first().unwrap(); + + for (i, &f) in values.iter().skip(1).enumerate() { + if f == selected { + sum_row += *row_vec.get(i).unwrap(); + } else { + let selector = if selected { + RowSelector::select(sum_row) } else { - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - sum_row = *row_vec.get(i).unwrap(); - selected = f; - } + RowSelector::skip(sum_row) + }; + vec.push(selector); + sum_row = *row_vec.get(i).unwrap(); + selected = f; } + } - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - return Ok(vec); + let selector = if selected { + RowSelector::select(sum_row) } else { - debug!("Error evaluating page index predicate values missing page index col_id is{}", pruning_stats.col_id); - metrics.predicate_evaluation_errors.add(1); - } + RowSelector::skip(sum_row) + }; + vec.push(selector); + return Ok(vec); } // stats filter array could not be built - // return a closure which will not filter out any row groups + // return a result which will not filter out any pages Err(e) => { - debug!("Error evaluating page index predicate values {}", e); + error!("Error evaluating page index predicate values {}", e); metrics.predicate_evaluation_errors.add(1); + return Ok(vec![RowSelector::select(group.num_rows() as usize)]); } } } @@ -2052,6 +2189,65 @@ mod tests { ParquetFileMetrics::new(0, "file.parquet", &metrics) } + #[test] + fn test_combine_row_selection() { + // a size equal b size + let a = vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(1), + ]; + let b = vec![ + RowSelector::select(8), + RowSelector::skip(1), + RowSelector::select(1), + ]; + + let res = intersect_row_selection(a, b); + assert_eq!( + res, + vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(1) + ], + ); + + // a size larger than b size + let a = vec![ + RowSelector::select(3), + RowSelector::skip(33), + RowSelector::select(3), + RowSelector::skip(33), + ]; + let b = vec![RowSelector::select(36), RowSelector::skip(36)]; + let res = intersect_row_selection(a, b); + assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]); + + // a size less than b size + let a = vec![RowSelector::select(3), RowSelector::skip(7)]; + let b = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + ]; + let res = intersect_row_selection(a, b); + assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]); + + let a = vec![RowSelector::select(3), RowSelector::skip(7)]; + let b = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + ]; + let res = intersect_row_selection(a, b); + assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]); + } + #[test] fn row_group_pruning_predicate_simple_expr() { use datafusion_expr::{col, lit}; @@ -2614,67 +2810,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn parquet_exec_with_page_index_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - - let object_store_url = ObjectStoreUrl::local_filesystem(); - let store = session_ctx - .runtime_env() - .object_store(&object_store_url) - .unwrap(); - - let testdata = crate::test_util::parquet_test_data(); - let filename = format!("{}/alltypes_tiny_pages.parquet", testdata); - - let meta = local_unpartitioned_file(filename); - - let schema = ParquetFormat::default() - .infer_schema(&store, &[meta.clone()]) - .await - .unwrap(); - - let partitioned_file = PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - extensions: None, - }; - - // create filter month == 1; - let filter = col("month").eq(lit(1_i32)); - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url, - file_groups: vec![vec![partitioned_file]], - file_schema: schema, - statistics: Statistics::default(), - // file has 10 cols so index 12 should be month - projection: None, - limit: None, - table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), - }, - Some(filter), - None, - ); - - let parquet_exec_page_index = parquet_exec.clone().with_enable_page_index(true); - - let mut results = parquet_exec_page_index.execute(0, task_ctx)?; - - let batch = results.next().await.unwrap()?; - - // from the page index should create below RowSelection - // vec.push(RowSelector::select(312)); - // vec.push(RowSelector::skip(3330)); - // vec.push(RowSelector::select(333)); - // vec.push(RowSelector::skip(3330)); - // total 645 row - - assert_eq!(batch.num_rows(), 645); - Ok(()) - } } diff --git a/datafusion/core/tests/parquet_page_index_pruning.rs b/datafusion/core/tests/parquet_page_index_pruning.rs new file mode 100644 index 0000000000000..2a8791b6965b4 --- /dev/null +++ b/datafusion/core/tests/parquet_page_index_pruning.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::config::ConfigOptions; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use datafusion_common::Statistics; +use datafusion_expr::{col, lit, Expr}; +use object_store::path::Path; +use object_store::ObjectMeta; +use tokio_stream::StreamExt; + +async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetExec { + let object_store_url = ObjectStoreUrl::local_filesystem(); + let store = session_ctx + .runtime_env() + .object_store(&object_store_url) + .unwrap(); + + let testdata = datafusion::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_tiny_pages.parquet", testdata); + + let location = Path::from_filesystem_path(filename.as_str()).unwrap(); + let metadata = std::fs::metadata(filename).expect("Local file metadata"); + let meta = ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, + }; + + let schema = ParquetFormat::default() + .infer_schema(&store, &[meta.clone()]) + .await + .unwrap(); + + let partitioned_file = PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: None, + }; + + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url, + file_groups: vec![vec![partitioned_file]], + file_schema: schema, + statistics: Statistics::default(), + // file has 10 cols so index 12 should be month + projection: None, + limit: None, + table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), + }, + Some(filter), + None, + ); + parquet_exec.with_enable_page_index(true) +} + +#[tokio::test] +async fn page_index_filter_one_col() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + // 1.create filter month == 1; + let filter = col("month").eq(lit(1_i32)); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + + // `month = 1` from the page index should create below RowSelection + // vec.push(RowSelector::select(312)); + // vec.push(RowSelector::skip(3330)); + // vec.push(RowSelector::select(333)); + // vec.push(RowSelector::skip(3330)); + // total 645 row + assert_eq!(batch.num_rows(), 645); + + // 2. create filter month == 1 or month == 2; + let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + + // `month = 12` from the page index should create below RowSelection + // vec.push(RowSelector::skip(894)); + // vec.push(RowSelector::select(339)); + // vec.push(RowSelector::skip(3330)); + // vec.push(RowSelector::select(312)); + // vec.push(RowSelector::skip(2430)); + // combine with before filter total 1275 row + assert_eq!(batch.num_rows(), 1275); + + // 3. create filter month == 1 and month == 12; + let filter = col("month") + .eq(lit(1_i32)) + .and(col("month").eq(lit(12_i32))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await; + + assert!(batch.is_none()); + + // 4.create filter 0 < month < 2 ; + let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + + // should same with `month = 1` + assert_eq!(batch.num_rows(), 645); +} + +#[tokio::test] +async fn page_index_filter_multi_col() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + // create filter month == 1 and year = 2009; + let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + + // `year = 2009` from the page index should create below RowSelection + // vec.push(RowSelector::select(3663)); + // vec.push(RowSelector::skip(3642)); + // combine with `month = 1` total 333 row + assert_eq!(batch.num_rows(), 333); + + // create filter (year = 2009 or id = 1) and month = 1; + // this should only use `month = 1` to evaluate the page index. + let filter = col("month") + .eq(lit(1_i32)) + .and(col("year").eq(lit(2009)).or(col("id").eq(lit(1)))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + assert_eq!(batch.num_rows(), 645); + + // create filter (year = 2009 or id = 1) + // this filter use two columns will not push down + let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + assert_eq!(batch.num_rows(), 7300); + + // create filter (year = 2009 and id = 1) or (year = 2010) + // this filter use two columns will not push down + // todo but after use CNF rewrite it could rewrite to (year = 2009 or year = 2010) and (id = 1 or year = 2010) + // which could push (year = 2009 or year = 2010) down. + let filter = col("year") + .eq(lit(2009)) + .and(col("id").eq(lit(1))) + .or(col("year").eq(lit(2010))); + + let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + + let batch = results.next().await.unwrap().unwrap(); + assert_eq!(batch.num_rows(), 7300); +}