Skip to content

Commit

Permalink
Minor: Use upstream concat_batches (#11615)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jul 24, 2024
1 parent c8ef545 commit 72c6491
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 39 deletions.
38 changes: 7 additions & 31 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,23 @@
//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
//! vectorized processing by upstream operators.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use arrow::compute::concat_batches;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::TaskContext;

use futures::stream::{Stream, StreamExt};
use log::trace;

/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
/// vectorized processing by upstream operators.
Expand Down Expand Up @@ -229,11 +227,7 @@ impl CoalesceBatchesStream {
// check to see if we have enough batches yet
if self.buffered_rows >= self.target_batch_size {
// combine the batches and return
let batch = concat_batches(
&self.schema,
&self.buffer,
self.buffered_rows,
)?;
let batch = concat_batches(&self.schema, &self.buffer)?;
// reset buffer state
self.buffer.clear();
self.buffered_rows = 0;
Expand All @@ -250,11 +244,7 @@ impl CoalesceBatchesStream {
return Poll::Ready(None);
} else {
// combine the batches and return
let batch = concat_batches(
&self.schema,
&self.buffer,
self.buffered_rows,
)?;
let batch = concat_batches(&self.schema, &self.buffer)?;
// reset buffer state
self.buffer.clear();
self.buffered_rows = 0;
Expand All @@ -276,20 +266,6 @@ impl RecordBatchStream for CoalesceBatchesStream {
}
}

/// Concatenates an array of `RecordBatch` into one batch
pub fn concat_batches(
schema: &SchemaRef,
batches: &[RecordBatch],
row_count: usize,
) -> ArrowResult<RecordBatch> {
trace!(
"Combined {} batches containing {} rows",
batches.len(),
row_count
);
arrow::compute::concat_batches(schema, batches)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
//! Defines the cross join plan for loading the left side of the cross join
//! and producing batches in parallel for the right partitions
use std::{any::Any, sync::Arc, task::Poll};

use super::utils::{
adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceAsync, OnceFut,
StatefulStreamResult,
};
use crate::coalesce_batches::concat_batches;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
Expand All @@ -33,6 +30,8 @@ use crate::{
ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use arrow::compute::concat_batches;
use std::{any::Any, sync::Arc, task::Poll};

use arrow::datatypes::{Fields, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -155,7 +154,7 @@ async fn load_left_input(
let stream = merge.execute(0, context)?;

// Load all batches and count the rows
let (batches, num_rows, _, reservation) = stream
let (batches, _num_rows, _, reservation) = stream
.try_fold(
(Vec::new(), 0usize, metrics, reservation),
|mut acc, batch| async {
Expand All @@ -175,7 +174,7 @@ async fn load_left_input(
)
.await?;

let merged_batch = concat_batches(&left_schema, &batches, num_rows)?;
let merged_batch = concat_batches(&left_schema, &batches)?;

Ok((merged_batch, reservation))
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::sync::Arc;
use std::task::Poll;

use super::utils::{asymmetric_join_output_partitioning, need_produce_result_in_final};
use crate::coalesce_batches::concat_batches;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
Expand All @@ -44,6 +43,7 @@ use crate::{
use arrow::array::{
BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
};
use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
Expand Down Expand Up @@ -364,7 +364,7 @@ async fn collect_left_input(
let stream = merge.execute(0, context)?;

// Load all batches and count the rows
let (batches, num_rows, metrics, mut reservation) = stream
let (batches, _num_rows, metrics, mut reservation) = stream
.try_fold(
(Vec::new(), 0usize, join_metrics, reservation),
|mut acc, batch| async {
Expand All @@ -384,7 +384,7 @@ async fn collect_left_input(
)
.await?;

let merged_batch = concat_batches(&schema, &batches, num_rows)?;
let merged_batch = concat_batches(&schema, &batches)?;

// Reserve memory for visited_left_side bitmap if required by join type
let visited_left_side = if with_visited_left_side {
Expand Down

0 comments on commit 72c6491

Please sign in to comment.