Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Sep 19, 2024
1 parent 64abac8 commit 1cd175f
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 66 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,22 @@ version = "41.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", features = [
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", features = [
"prettyprint",
] }
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false, features = [
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false }
arrow-flight = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", features = [
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false }
arrow-flight = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", features = [
"flight-sql-experimental",
] }
arrow-ipc = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false, features = [
arrow-ipc = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false, features = [
"lz4",
] }
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false }
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false }
arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false }
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false }
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false }
arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
Expand Down Expand Up @@ -118,7 +118,7 @@ log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.10.2", default-features = false }
parking_lot = "0.12"
parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "863233c716", default-features = false, features = [
parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f8ed6ecf56", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down
13 changes: 0 additions & 13 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,24 +165,11 @@ impl FileOpener for ParquetOpener {
}
// If there is a predicate that can be evaluated against the metadata
if let Some(predicate) = predicate.as_ref() {

// first run without dictionary filtering, to reduce io to dictionary pages
row_groups.prune_by_statistics(
&mut builder,
&file_schema,
rg_metadata,
predicate,
false,
&file_metrics,
);

// second run with dictionary filtering
row_groups.prune_by_statistics(
&mut builder,
&file_schema,
rg_metadata,
predicate,
true,
&file_metrics,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,42 +109,57 @@ impl RowGroupAccessPlanFilter {
arrow_schema: &Schema,
groups: &[RowGroupMetaData],
predicate: &PruningPredicate,
use_dictionary_filtering: bool,
metrics: &ParquetFileMetrics,
) {
assert_eq!(groups.len(), self.access_plan.len());
// Indexes of row groups still to scan
// blaze: for less reading of dictionary pages, we still prune in separated passes
let row_group_indexes = self.access_plan.row_group_indexes();
let row_group_metadatas = row_group_indexes
.iter()
.map(|&i| &groups[i])
.collect::<Vec<_>>();

let pruning_stats = RowGroupPruningStatistics {
row_group_metadatas,
arrow_schema,
use_dictionary_filtering,
cached_dictionaries: vec![OnceCell::new(); arrow_schema.fields().len()],
builder: RefCell::new(builder),
};
for &rg_idx in &row_group_indexes {
let row_group_metadatas = row_group_indexes
.iter()
.map(|&i| &groups[i])
.skip(rg_idx)
.take(1)
.collect::<Vec<_>>();

let mut pruning_stats = RowGroupPruningStatistics {
row_group_metadatas,
arrow_schema,
use_dictionary_filtering: false,
cached_dictionaries: vec![],
builder: RefCell::new(builder),
};

// try to prune the row groups in a single call
match predicate.prune(&pruning_stats) {
Ok(values) => {
// values[i] is false means the predicate could not be true for row group i
for (idx, &value) in row_group_indexes.iter().zip(values.iter()) {
if !value {
self.access_plan.skip(*idx);
metrics.row_groups_pruned_statistics.add(1);
} else {
metrics.row_groups_matched_statistics.add(1);
// prune using two runs because pruning with dictionary filtering is more expensive
// first run: prune without dictionary filtering
match predicate.prune(&pruning_stats) {
Ok(values) => {
if !values[0] {
self.access_plan.skip(rg_idx);
continue; // no need to prune with dictionary
}
}
Err(e) => {
log::info!("Error evaluating row group predicate values without dictionary {e}");
metrics.predicate_evaluation_errors.add(1);
continue; // no need to prune with dictionary
}
}
// stats filter array could not be built, so we can't prune
Err(e) => {
log::debug!("Error evaluating row group predicate values {e}");
metrics.predicate_evaluation_errors.add(1);

// second run: prune with dictionary filtering
pruning_stats.use_dictionary_filtering = true;
pruning_stats.cached_dictionaries = vec![OnceCell::new(); arrow_schema.fields().len()];
match predicate.prune(&pruning_stats) {
Ok(values) => {
if !values[0] {
self.access_plan.skip(rg_idx);
}
}
Err(e) => {
log::info!("Error evaluating row group predicate values with dictionary {e}");
metrics.predicate_evaluation_errors.add(1);
}
}
}
}
Expand Down Expand Up @@ -463,9 +478,6 @@ impl<'a, T: AsyncFileReader + Send + 'static> PruningStatistics for RowGroupPrun
if !self.use_dictionary_filtering {
return None;
}
if self.row_group_metadatas.is_empty() {
return None;
}

let col_idx = self
.row_group_metadatas
Expand All @@ -477,23 +489,24 @@ impl<'a, T: AsyncFileReader + Send + 'static> PruningStatistics for RowGroupPrun

self.cached_dictionaries[col_idx].get_or_init(|| {
let mut values = vec![];
for &row_group_metadata in &self.row_group_metadatas {
let dict_values = match futures::executor::block_on(async move {
for row_group_metadata in self.metadata_iter() {
let dict_values =
parquet::blaze::get_dictionary_for_pruning(
&mut self.builder.borrow_mut().input.0,
row_group_metadata,
col_idx,
).await
}) {
Ok(Some(dict_values)) => dict_values,
_ => return None,
};
let dict_array = ListArray::try_new(
Arc::new(Field::new("items", dict_values.data_type().clone(), false)),
OffsetBuffer::new(ScalarBuffer::from(vec![0, dict_values.len() as i32])),
dict_values,
None,
).ok()?;
)
.ok()
.flatten()?;

let dict_array =
ListArray::try_new(
Arc::new(Field::new("items", dict_values.data_type().clone(), false)),
OffsetBuffer::new(ScalarBuffer::from(vec![0, dict_values.len() as i32])),
dict_values,
None,
)
.ok()?;
values.push(dict_array);
}
arrow::compute::concat(&values.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl SCOrExpr {

impl Display for SCOrExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "({} &&(SC) {})", self.left, self.right)
write!(f, "({} ||(SC) {})", self.left, self.right)
}
}

Expand Down

0 comments on commit 1cd175f

Please sign in to comment.