Skip to content

Commit

Permalink
docs: improve
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
crepererum and alamb committed Jan 13, 2023
1 parent 0080abe commit 632a5a8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! Special channel construction to distribute data from varios inputs into N outputs.
//! Special channel construction to distribute data from various inputs into N outputs
//! minimizing buffering but preventing deadlocks when repartitoning
//!
//! # Design
//!
Expand Down Expand Up @@ -106,9 +107,10 @@ impl<T> std::error::Error for SendError<T> {}
///
/// This handle can be cloned. All clones will write into the same channel. Dropping the last sender will close the
/// channel. In this case, the [receiver](DistributionReceiver) will still be able to poll the remaining data, but will
/// receiver `None` afterwards.
/// receive `None` afterwards.
#[derive(Debug)]
pub struct DistributionSender<T> {
/// To prevent lock inversion / deadlock, channel lock is always acquired prior to gate lock
channel: SharedChannel<T>,
gate: SharedGate,
}
Expand Down Expand Up @@ -185,6 +187,7 @@ impl<'a, T> Future for SendFuture<'a, T> {
let mut guard_gate = this.gate.lock();

// does ANY receiver need data?
// if so, allow sender to create another
if guard_gate.empty_channels == 0 {
guard_gate
.send_wakers
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ impl ExecutionPlan for RepartitionExec {
// if this is the first partition to be invoked then we need to set up initial state
if state.channels.is_empty() {
// create one channel per *output* partition
// note we use a custom channel that ensures there is always data for each receiver
// but limits the amount of buffering if required.
let (txs, rxs) = channels(num_output_partitions);
for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
let reservation = Arc::new(Mutex::new(
Expand Down

0 comments on commit 632a5a8

Please sign in to comment.