From bd91271b884a55a8f05df9afb0c82ac098572de6 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 9 Dec 2024 04:05:37 -0800 Subject: [PATCH] Minor: Use `div_ceil` --- .../physical-plan/src/joins/cross_join.rs | 27 ++++++++++--------- .../src/joins/nested_loop_join.rs | 24 ++++++++--------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index f53fe13df15e..8bf675e87362 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -190,18 +190,21 @@ async fn load_left_input( // Load all batches and count the rows let (batches, _metrics, reservation) = stream - .try_fold((Vec::new(), metrics, reservation), |mut acc, batch| async { - let batch_size = batch.get_array_memory_size(); - // Reserve memory for incoming batch - acc.2.try_grow(batch_size)?; - // Update metrics - acc.1.build_mem_used.add(batch_size); - acc.1.build_input_batches.add(1); - acc.1.build_input_rows.add(batch.num_rows()); - // Push batch to output - acc.0.push(batch); - Ok(acc) - }) + .try_fold( + (Vec::new(), metrics, reservation), + |(mut batches, metrics, mut reservation), batch| async { + let batch_size = batch.get_array_memory_size(); + // Reserve memory for incoming batch + reservation.try_grow(batch_size)?; + // Update metrics + metrics.build_mem_used.add(batch_size); + metrics.build_input_batches.add(1); + metrics.build_input_rows.add(batch.num_rows()); + // Push batch to output + batches.push(batch); + Ok((batches, metrics, reservation)) + }, + ) .await?; let merged_batch = concat_batches(&left_schema, &batches)?; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 2beeb92da499..d174564178df 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -45,7 +45,6 @@ use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array}; use arrow::compute::concat_batches; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow::util::bit_util; use datafusion_common::{ exec_datafusion_err, internal_err, JoinSide, Result, Statistics, }; @@ -440,17 +439,17 @@ async fn collect_left_input( let (batches, metrics, mut reservation) = stream .try_fold( (Vec::new(), join_metrics, reservation), - |mut acc, batch| async { + |(mut batches, metrics, mut reservation), batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch - acc.2.try_grow(batch_size)?; + reservation.try_grow(batch_size)?; // Update metrics - acc.1.build_mem_used.add(batch_size); - acc.1.build_input_batches.add(1); - acc.1.build_input_rows.add(batch.num_rows()); + metrics.build_mem_used.add(batch_size); + metrics.build_input_batches.add(1); + metrics.build_input_rows.add(batch.num_rows()); // Push batch to output - acc.0.push(batch); - Ok(acc) + batches.push(batch); + Ok((batches, metrics, reservation)) }, ) .await?; @@ -459,14 +458,13 @@ async fn collect_left_input( // Reserve memory for visited_left_side bitmap if required by join type let visited_left_side = if with_visited_left_side { - // TODO: Replace `ceil` wrapper with stable `div_cell` after - // https://github.com/rust-lang/rust/issues/88581 - let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8); + let n_rows = merged_batch.num_rows(); + let buffer_size = n_rows.div_ceil(8); reservation.try_grow(buffer_size)?; metrics.build_mem_used.add(buffer_size); - let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows()); - buffer.append_n(merged_batch.num_rows(), false); + let mut buffer = BooleanBufferBuilder::new(n_rows); + buffer.append_n(n_rows, false); buffer } else { BooleanBufferBuilder::new(0)