diff --git a/Cargo.toml b/Cargo.toml index 7a07c075bfdc..d351504420f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,22 +66,22 @@ version = "41.0.0" ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", features = [ +arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", features = [ "prettyprint", ] } -arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false, features = [ +arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false, features = [ "chrono-tz", ] } -arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false } -arrow-flight = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", features = [ +arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false } +arrow-flight = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", features = [ "flight-sql-experimental", ] } -arrow-ipc = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false, features = [ +arrow-ipc = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false, features = [ "lz4", ] } -arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false } -arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false } -arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false } +arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false } +arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false } +arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false } async-trait = "0.1.73" bigdecimal = "=0.4.1" bytes = "1.4" @@ -118,7 +118,7 @@ log = "^0.4" num_cpus = "1.13.0" object_store = { version = "0.10.2", default-features = false } parking_lot = "0.12" -parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false, features = [ +parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index f67ca8b0e76f..60b2b1d58cec 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -165,24 +165,11 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata if let Some(predicate) = predicate.as_ref() { - - // first run without dictionary filtering, to reduce io to dictionary pages - row_groups.prune_by_statistics( - &mut builder, - &file_schema, - rg_metadata, - predicate, - false, - &file_metrics, - ); - - // second run with dictionary filtering row_groups.prune_by_statistics( &mut builder, &file_schema, rg_metadata, predicate, - true, &file_metrics, ); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index d8913b724dad..4d8bb8f4558e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -109,42 +109,57 @@ impl RowGroupAccessPlanFilter { arrow_schema: &Schema, groups: &[RowGroupMetaData], predicate: &PruningPredicate, - use_dictionary_filtering: bool, metrics: &ParquetFileMetrics, ) { assert_eq!(groups.len(), self.access_plan.len()); // Indexes of row groups still to scan + // blaze: for less reading of dictionary pages, we still prune in separated passes let row_group_indexes = self.access_plan.row_group_indexes(); - let row_group_metadatas = row_group_indexes - .iter() - .map(|&i| &groups[i]) - .collect::>(); - - let pruning_stats = RowGroupPruningStatistics { - row_group_metadatas, - arrow_schema, - use_dictionary_filtering, - cached_dictionaries: vec![OnceCell::new(); arrow_schema.fields().len()], - builder: RefCell::new(builder), - }; + for &rg_idx in &row_group_indexes { + let row_group_metadatas = row_group_indexes + .iter() + .map(|&i| &groups[i]) + .skip(rg_idx) + .take(1) + .collect::>(); + + let mut pruning_stats = RowGroupPruningStatistics { + row_group_metadatas, + arrow_schema, + use_dictionary_filtering: false, + cached_dictionaries: vec![], + builder: RefCell::new(builder), + }; - // try to prune the row groups in a single call - match predicate.prune(&pruning_stats) { - Ok(values) => { - // values[i] is false means the predicate could not be true for row group i - for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { - if !value { - self.access_plan.skip(*idx); - metrics.row_groups_pruned_statistics.add(1); - } else { - metrics.row_groups_matched_statistics.add(1); + // prune using two runs because pruning with dictionary filtering is more expensive + // first run: prune without dictionary filtering + match predicate.prune(&pruning_stats) { + Ok(values) => { + if !values[0] { + self.access_plan.skip(rg_idx); + continue; // no need to prune with dictionary } } + Err(e) => { + log::info!("Error evaluating row group predicate values without dictionary {e}"); + metrics.predicate_evaluation_errors.add(1); + continue; // no need to prune with dictionary + } } - // stats filter array could not be built, so we can't prune - Err(e) => { - log::debug!("Error evaluating row group predicate values {e}"); - metrics.predicate_evaluation_errors.add(1); + + // second run: prune with dictionary filtering + pruning_stats.use_dictionary_filtering = true; + pruning_stats.cached_dictionaries = vec![OnceCell::new(); arrow_schema.fields().len()]; + match predicate.prune(&pruning_stats) { + Ok(values) => { + if !values[0] { + self.access_plan.skip(rg_idx); + } + } + Err(e) => { + log::info!("Error evaluating row group predicate values with dictionary {e}"); + metrics.predicate_evaluation_errors.add(1); + } } } } @@ -463,9 +478,6 @@ impl<'a, T: AsyncFileReader + Send + 'static> PruningStatistics for RowGroupPrun if !self.use_dictionary_filtering { return None; } - if self.row_group_metadatas.is_empty() { - return None; - } let col_idx = self .row_group_metadatas @@ -477,23 +489,24 @@ impl<'a, T: AsyncFileReader + Send + 'static> PruningStatistics for RowGroupPrun self.cached_dictionaries[col_idx].get_or_init(|| { let mut values = vec![]; - for &row_group_metadata in &self.row_group_metadatas { - let dict_values = match futures::executor::block_on(async move { + for row_group_metadata in self.metadata_iter() { + let dict_values = parquet::blaze::get_dictionary_for_pruning( &mut self.builder.borrow_mut().input.0, row_group_metadata, col_idx, - ).await - }) { - Ok(Some(dict_values)) => dict_values, - _ => return None, - }; - let dict_array = ListArray::try_new( - Arc::new(Field::new("items", dict_values.data_type().clone(), false)), - OffsetBuffer::new(ScalarBuffer::from(vec![0, dict_values.len() as i32])), - dict_values, - None, - ).ok()?; + ) + .ok() + .flatten()?; + + let dict_array = + ListArray::try_new( + Arc::new(Field::new("items", dict_values.data_type().clone(), false)), + OffsetBuffer::new(ScalarBuffer::from(vec![0, dict_values.len() as i32])), + dict_values, + None, + ) + .ok()?; values.push(dict_array); } arrow::compute::concat(&values.iter() diff --git a/datafusion/physical-expr/src/expressions/short_circuiting.rs b/datafusion/physical-expr/src/expressions/short_circuiting.rs index bbb61baf86ed..926343dc3320 100644 --- a/datafusion/physical-expr/src/expressions/short_circuiting.rs +++ b/datafusion/physical-expr/src/expressions/short_circuiting.rs @@ -114,7 +114,7 @@ impl SCOrExpr { impl Display for SCOrExpr { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "({} &&(SC) {})", self.left, self.right) + write!(f, "({} ||(SC) {})", self.left, self.right) } }