Skip to content

Commit

Permalink
Revert "refactor: improve repartition buffering (apache#4867)"
Browse files Browse the repository at this point in the history
This reverts commit a9ddcd3
  • Loading branch information
DDtKey committed Feb 2, 2023
1 parent b6dbb8d commit 8ae1691
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 816 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@ use std::{any::Any, vec};
use crate::error::{DataFusionError, Result};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::repartition::distributor_channels::channels;
use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics,
};
use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use log::debug;

use self::distributor_channels::{DistributionReceiver, DistributionSender};
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::common::{AbortOnDropMany, AbortOnDropSingle};
use super::expressions::PhysicalSortExpr;
Expand All @@ -45,13 +43,12 @@ use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::stream::Stream;
use futures::{FutureExt, StreamExt};
use futures::StreamExt;
use hashbrown::HashMap;
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;

mod distributor_channels;

type MaybeBatch = Option<Result<RecordBatch>>;
type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;

Expand All @@ -63,8 +60,8 @@ struct RepartitionExecState {
channels: HashMap<
usize,
(
DistributionSender<MaybeBatch>,
DistributionReceiver<MaybeBatch>,
UnboundedSender<MaybeBatch>,
UnboundedReceiver<MaybeBatch>,
SharedMemoryReservation,
),
>,
Expand Down Expand Up @@ -134,92 +131,67 @@ impl BatchPartitioner {
where
F: FnMut(usize, RecordBatch) -> Result<()>,
{
self.partition_iter(batch)?.try_for_each(|res| match res {
Ok((partition, batch)) => f(partition, batch),
Err(e) => Err(e),
})
}
match &mut self.state {
BatchPartitionerState::RoundRobin {
num_partitions,
next_idx,
} => {
let idx = *next_idx;
*next_idx = (*next_idx + 1) % *num_partitions;
f(idx, batch)?;
}
BatchPartitionerState::Hash {
random_state,
exprs,
num_partitions: partitions,
hash_buffer,
} => {
let mut timer = self.timer.timer();

/// Actual implementation of [`partition`](Self::partition).
///
/// The reason this was pulled out is that we need to have a variant of `partition` that works w/ sync functions,
/// and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve
/// this (so we don't need to clone the entire implementation).
fn partition_iter(
&mut self,
batch: RecordBatch,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_> {
let it: Box<dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send> =
match &mut self.state {
BatchPartitionerState::RoundRobin {
num_partitions,
next_idx,
} => {
let idx = *next_idx;
*next_idx = (*next_idx + 1) % *num_partitions;
Box::new(std::iter::once(Ok((idx, batch))))
}
BatchPartitionerState::Hash {
random_state,
exprs,
num_partitions: partitions,
hash_buffer,
} => {
let timer = self.timer.timer();

let arrays = exprs
.iter()
.map(|expr| {
Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let arrays = exprs
.iter()
.map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;

hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);
hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);

create_hashes(&arrays, random_state, hash_buffer)?;
create_hashes(&arrays, random_state, hash_buffer)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
.collect();
let mut indices: Vec<_> = (0..*partitions)
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
.collect();

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize]
.append_value(index as u64);
}

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize]
.append_value(index as u64);
for (partition, mut indices) in indices.into_iter().enumerate() {
let indices = indices.finish();
if indices.is_empty() {
continue;
}

let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, mut indices)| {
let indices = indices.finish();
(!indices.is_empty()).then_some((partition, indices))
// Produce batches based on indices
let columns = batch
.columns()
.iter()
.map(|c| {
arrow::compute::take(c.as_ref(), &indices, None)
.map_err(DataFusionError::ArrowError)
})
.map(move |(partition, indices)| {
// Produce batches based on indices
let columns = batch
.columns()
.iter()
.map(|c| {
arrow::compute::take(c.as_ref(), &indices, None)
.map_err(DataFusionError::ArrowError)
})
.collect::<Result<Vec<ArrayRef>>>()?;

let batch =
RecordBatch::try_new(batch.schema(), columns).unwrap();

// bind timer so it drops w/ this iterator
let _ = &timer;

Ok((partition, batch))
});

Box::new(it)
}
};
.collect::<Result<Vec<ArrayRef>>>()?;

Ok(it)
let batch = RecordBatch::try_new(batch.schema(), columns).unwrap();

timer.stop();
f(partition, batch)?;
timer.restart();
}
}
}
Ok(())
}
}

Expand Down Expand Up @@ -364,15 +336,22 @@ impl ExecutionPlan for RepartitionExec {
// if this is the first partition to be invoked then we need to set up initial state
if state.channels.is_empty() {
// create one channel per *output* partition
// note we use a custom channel that ensures there is always data for each receiver
// but limits the amount of buffering if required.
let (txs, rxs) = channels(num_output_partitions);
for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
for partition in 0..num_output_partitions {
// Note that this operator uses unbounded channels to avoid deadlocks because
// the output partitions can be read in any order and this could cause input
// partitions to be blocked when sending data to output UnboundedReceivers that are not
// being read yet. This may cause high memory usage if the next operator is
// reading output partitions in order rather than concurrently. One workaround
// for this would be to add spill-to-disk capabilities.
let (sender, receiver) =
mpsc::unbounded_channel::<Option<Result<RecordBatch>>>();
let reservation = Arc::new(Mutex::new(
MemoryConsumer::new(format!("RepartitionExec[{partition}]"))
.register(context.memory_pool()),
));
state.channels.insert(partition, (tx, rx, reservation));
state
.channels
.insert(partition, (sender, receiver, reservation));
}

// launch one async task per *input* partition
Expand Down Expand Up @@ -427,7 +406,7 @@ impl ExecutionPlan for RepartitionExec {
num_input_partitions,
num_input_partitions_processed: 0,
schema: self.input.schema(),
input: rx,
input: UnboundedReceiverStream::new(rx),
drop_helper: Arc::clone(&state.abort_helper),
reservation,
}))
Expand Down Expand Up @@ -485,10 +464,7 @@ impl RepartitionExec {
async fn pull_from_input(
input: Arc<dyn ExecutionPlan>,
i: usize,
mut txs: HashMap<
usize,
(DistributionSender<MaybeBatch>, SharedMemoryReservation),
>,
mut txs: HashMap<usize, (UnboundedSender<MaybeBatch>, SharedMemoryReservation)>,
partitioning: Partitioning,
r_metrics: RepartitionMetrics,
context: Arc<TaskContext>,
Expand All @@ -515,23 +491,23 @@ impl RepartitionExec {
None => break,
};

for res in partitioner.partition_iter(batch)? {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();
partitioner.partition(batch, |partition, partitioned| {
let size = partitioned.get_array_memory_size();

let timer = r_metrics.send_time.timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = txs.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
if tx.send(Some(Ok(partitioned))).is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
txs.remove(&partition);
}
}
timer.done();
}
Ok(())
})?;
}

Ok(())
Expand All @@ -544,7 +520,7 @@ impl RepartitionExec {
/// channels.
async fn wait_for_task(
input_task: AbortOnDropSingle<Result<()>>,
txs: HashMap<usize, DistributionSender<Option<Result<RecordBatch>>>>,
txs: HashMap<usize, UnboundedSender<Option<Result<RecordBatch>>>>,
) {
// wait for completion, and propagate error
// note we ignore errors on send (.ok) as that means the receiver has already shutdown.
Expand All @@ -558,7 +534,7 @@ impl RepartitionExec {
"Join Error".to_string(),
Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))),
));
tx.send(Some(err)).await.ok();
tx.send(Some(err)).ok();
}
}
// Error from running input task
Expand All @@ -568,14 +544,14 @@ impl RepartitionExec {
for (_, tx) in txs {
// wrap it because need to send error to all output partitions
let err = Err(DataFusionError::External(Box::new(e.clone())));
tx.send(Some(err)).await.ok();
tx.send(Some(err)).ok();
}
}
// Input task completed successfully
Ok(Ok(())) => {
// notify each output partition that this input partition has no more data
for (_, tx) in txs {
tx.send(None).await.ok();
tx.send(None).ok();
}
}
}
Expand All @@ -593,7 +569,7 @@ struct RepartitionStream {
schema: SchemaRef,

/// channel containing the repartitioned batches
input: DistributionReceiver<MaybeBatch>,
input: UnboundedReceiverStream<Option<Result<RecordBatch>>>,

/// Handle to ensure background tasks are killed when no longer needed.
#[allow(dead_code)]
Expand All @@ -611,7 +587,7 @@ impl Stream for RepartitionStream {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match self.input.recv().poll_unpin(cx) {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Some(v))) => {
if let Ok(batch) = &v {
self.reservation
Expand Down
Loading

0 comments on commit 8ae1691

Please sign in to comment.