Skip to content

Commit

Permalink
blaze: supports parquet scan with adaptive batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Apr 21, 2024
1 parent 5d6b63a commit e9f983c
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,33 @@ impl FileOpener for ParquetOpener {
builder = builder.with_limit(limit)
}

let total_uncompressed_size = builder
.metadata()
.row_groups()
.iter()
.map(|rg| rg
.columns()
.iter()
.enumerate()
.filter(|(idx, _)| mask.leaf_included(*idx))
.map(|(_, col)| col.uncompressed_size())
.sum::<i64>())
.sum::<i64>();
let total_num_rows = builder
.metadata()
.row_groups()
.iter()
.map(|rg| rg.num_rows())
.sum::<i64>();
let adaptive_batch_mem_size = 16777216; // blaze: use 16MB batch mem size as default
let adaptive_batch_size = batch_size
.min((adaptive_batch_mem_size * total_num_rows / total_uncompressed_size.max(1)) as usize)
.max(1);
log::info!("executing parquet scan with adaptive batch size: {adaptive_batch_size}");

let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_batch_size(adaptive_batch_size)
.with_row_groups(row_groups)
.build()?;

Expand Down

0 comments on commit e9f983c

Please sign in to comment.