diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index b42107384d97..90bf7df8d7b2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -550,7 +550,7 @@ impl FileOpener for ParquetOpener { if enable_page_index && !row_groups.is_empty() { if let Some(p) = page_pruning_predicate { let pruned = - p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?; + p.prune(&file_schema, builder.parquet_schema(), &row_groups, file_metadata.as_ref(), &file_metrics)?; if let Some(row_selection) = pruned { builder = builder.with_row_selection(row_selection); } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 190e733b42c3..2c16473fa4bf 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -27,7 +27,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; -use parquet::schema::types::ColumnDescriptor; +use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, errors::ParquetError, @@ -39,9 +39,10 @@ use parquet::{ }; use std::collections::HashSet; use std::sync::Arc; +use arrow_schema::Schema; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128; +use crate::datasource::physical_plan::parquet::statistics::{from_bytes_to_i128, parquet_column}; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; @@ -128,6 +129,8 @@ impl PagePruningPredicate { /// Returns a [`RowSelection`] for the given file pub fn prune( &self, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, row_groups: &[usize], file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, @@ -164,17 +167,22 @@ impl PagePruningPredicate { let mut row_selections = Vec::with_capacity(page_index_predicates.len()); for predicate in page_index_predicates { // find column index by looking in the row group metadata. - let col_idx = find_column_index(predicate, &groups[0]); + let name = find_column_name(predicate); + let mut parquet_col = None; + if name.is_some(){ + parquet_col = parquet_column(parquet_schema, arrow_schema, name.unwrap().as_str()); + } let mut selectors = Vec::with_capacity(row_groups.len()); for r in row_groups.iter() { let row_group_metadata = &groups[*r]; let rg_offset_indexes = file_offset_indexes.get(*r); let rg_page_indexes = file_page_indexes.get(*r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = - (rg_page_indexes, rg_offset_indexes, col_idx) + if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_ref)) = + (rg_page_indexes, rg_offset_indexes, parquet_col) { + let col_idx = col_ref.0; selectors.extend( prune_pages_in_one_row_group( row_group_metadata, @@ -249,10 +257,9 @@ impl PagePruningPredicate { /// that `extract_page_index_push_down_predicates` only return /// predicate with one col) /// -fn find_column_index( +fn find_column_name( predicate: &PruningPredicate, - row_group_metadata: &RowGroupMetaData, -) -> Option { +) -> Option { let mut found_required_column: Option<&Column> = None; for required_column_details in predicate.required_columns().iter() { @@ -276,18 +283,7 @@ fn find_column_index( return None; }; - let col_idx = row_group_metadata - .columns() - .iter() - .enumerate() - .find(|(_idx, c)| c.column_descr().name() == column.name() && c.column_descr().path().parts().len() == 0) - .map(|(idx, _c)| idx); - - if col_idx.is_none() { - trace!("Can not find column {} in row group meta", column.name()); - } - - col_idx + Some(column.name().to_string()) } /// Intersects the [`RowSelector`]s