Skip to content

Commit

Permalink
use parquet_column instead of find_column_index.
Browse files Browse the repository at this point in the history
  • Loading branch information
manoj-inukolunu committed Jan 20, 2024
1 parent 0d8d91a commit 74e44e0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl FileOpener for ParquetOpener {
if enable_page_index && !row_groups.is_empty() {
if let Some(p) = page_pruning_predicate {
let pruned =
p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
p.prune(&file_schema, builder.parquet_schema(), &row_groups, file_metadata.as_ref(), &file_metrics)?;
if let Some(row_selection) = pruned {
builder = builder.with_row_selection(row_selection);
}
Expand Down
36 changes: 16 additions & 20 deletions datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
use parquet::schema::types::ColumnDescriptor;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand All @@ -39,9 +39,10 @@ use parquet::{
};
use std::collections::HashSet;
use std::sync::Arc;
use arrow_schema::Schema;

use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::datasource::physical_plan::parquet::statistics::{from_bytes_to_i128, parquet_column};
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
Expand Down Expand Up @@ -128,6 +129,8 @@ impl PagePruningPredicate {
/// Returns a [`RowSelection`] for the given file
pub fn prune(
&self,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
row_groups: &[usize],
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
Expand Down Expand Up @@ -164,17 +167,22 @@ impl PagePruningPredicate {
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// find column index by looking in the row group metadata.
let col_idx = find_column_index(predicate, &groups[0]);

let name = find_column_name(predicate);
let mut parquet_col = None;
if name.is_some(){
parquet_col = parquet_column(parquet_schema, arrow_schema, name.unwrap().as_str());
}
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let row_group_metadata = &groups[*r];

let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) =
(rg_page_indexes, rg_offset_indexes, col_idx)
if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_ref)) =
(rg_page_indexes, rg_offset_indexes, parquet_col)
{
let col_idx = col_ref.0;
selectors.extend(
prune_pages_in_one_row_group(
row_group_metadata,
Expand Down Expand Up @@ -249,10 +257,9 @@ impl PagePruningPredicate {
/// that `extract_page_index_push_down_predicates` only return
/// predicate with one col)
///
fn find_column_index(
fn find_column_name(
predicate: &PruningPredicate,
row_group_metadata: &RowGroupMetaData,
) -> Option<usize> {
) -> Option<String> {
let mut found_required_column: Option<&Column> = None;

for required_column_details in predicate.required_columns().iter() {
Expand All @@ -276,18 +283,7 @@ fn find_column_index(
return None;
};

let col_idx = row_group_metadata
.columns()
.iter()
.enumerate()
.find(|(_idx, c)| c.column_descr().name() == column.name() && c.column_descr().path().parts().len() == 0)
.map(|(idx, _c)| idx);

if col_idx.is_none() {
trace!("Can not find column {} in row group meta", column.name());
}

col_idx
Some(column.name().to_string())
}

/// Intersects the [`RowSelector`]s
Expand Down

0 comments on commit 74e44e0

Please sign in to comment.