From 656f9b65fe06540023d00a94cc6f0761f9eeee25 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 13 Jan 2023 08:30:42 +0100 Subject: [PATCH] docs: improve Co-authored-by: Andrew Lamb --- .../src/physical_plan/repartition/distributor_channels.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/repartition/distributor_channels.rs b/datafusion/core/src/physical_plan/repartition/distributor_channels.rs index 1eedf0d707be5..519218f2c2361 100644 --- a/datafusion/core/src/physical_plan/repartition/distributor_channels.rs +++ b/datafusion/core/src/physical_plan/repartition/distributor_channels.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Special channel construction to distribute data from varios inputs into N outputs. +//! Special channel construction to distribute data from various inputs into N outputs +//! minimizing buffering but preventing deadlocks when repartitoning //! //! # Design //! @@ -106,9 +107,10 @@ impl std::error::Error for SendError {} /// /// This handle can be cloned. All clones will write into the same channel. Dropping the last sender will close the /// channel. In this case, the [receiver](DistributionReceiver) will still be able to poll the remaining data, but will -/// receiver `None` afterwards. +/// receive `None` afterwards. #[derive(Debug)] pub struct DistributionSender { + /// To prevent lock inversion / deadlock, channel lock is always acquired prior to gate lock channel: SharedChannel, gate: SharedGate, } @@ -185,6 +187,7 @@ impl<'a, T> Future for SendFuture<'a, T> { let mut guard_gate = this.gate.lock(); // does ANY receiver need data? + // if so, allow sender to create another if guard_gate.empty_channels == 0 { guard_gate .send_wakers