diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index e8e186e5c45fe..1a797dbd29ae8 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -579,9 +579,33 @@ impl FileOpener for ParquetOpener { builder = builder.with_limit(limit) } + let total_uncompressed_size = builder + .metadata() + .row_groups() + .iter() + .map(|rg| rg + .columns() + .iter() + .enumerate() + .filter(|(idx, _)| mask.leaf_included(*idx)) + .map(|(_, col)| col.uncompressed_size()) + .sum::()) + .sum::(); + let total_num_rows = builder + .metadata() + .row_groups() + .iter() + .map(|rg| rg.num_rows()) + .sum::(); + let adaptive_batch_mem_size = 16777216; // blaze: use 16MB batch mem size as default + let adaptive_batch_size = batch_size + .min((adaptive_batch_mem_size * total_num_rows / total_uncompressed_size.max(1)) as usize) + .max(1); + log::info!("executing parquet scan with adaptive batch size: {adaptive_batch_size}"); + let stream = builder .with_projection(mask) - .with_batch_size(batch_size) + .with_batch_size(adaptive_batch_size) .with_row_groups(row_groups) .build()?;