-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
broadcast: Stop notifying after we've woken all wakers #5925
broadcast: Stop notifying after we've woken all wakers #5925
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
Hmm, there's an alternate approach: Move everything in the list into a separate list immediately, then empty the separate list. I think that I would prefer that approach because it is simpler to verify the correctness (no need to reason about the maximum length). Additionally, we already do this elsewhere, so it makes us more consistent across the codebase. You can find an existing implementation of this here: tokio/tokio/src/sync/notify.rs Lines 618 to 691 in ad58664
Would you be up for doing that? |
@Darksonn done, I think |
Within `notify_rx`, looping while re-locking and re-reading from `Shared.tail` as long as there are still available wakers causes a quadratic slowdown as receivers which are looping receiving from the channel are added. Instead of continually re-reading from the original list, this commit modifies `notify_rx` to move the waiters into a separate list immediately similar to how `Notify::notify_waiters` works, using a new `WaitersList` struct modified after NotifyWaitersList. Fixes #5923
Looks reasonable to me. Does this version also fix the performance issue? |
I believe the scope of lock condition can be reduced. The method impl<T> Shared<T> {
fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
// It is critical for `GuardedLinkedList` safety that the guard node is
// pinned in memory and is not dropped until the guarded list is dropped.
let guard = Waiter::new();
pin!(guard);
// We move all waiters to a secondary list. It uses a `GuardedLinkedList`
// underneath to allow every waiter to safely remove itself from it.
//
// * This list will be still guarded by the `waiters` lock.
// `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
// * This wrapper will empty the list on drop. It is critical for safety
// that we will not leave any list entry with a pointer to the local
// guard node after this function returns / panics.
let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
let mut wakers = WakeList::new();
'outer: loop {
// Because all waiter in tail.waiters has been moved to the secondary list, so we can drop the lock here.
drop(tail);
while wakers.can_push() {
match list.pop_back() {
Some(mut waiter) => {
// Safety: `tail` lock is still held.
let waiter = unsafe { waiter.as_mut() };
assert!(waiter.queued);
waiter.queued = false;
if let Some(waker) = waiter.waker.take() {
wakers.push(waker);
}
}
None => {
break 'outer;
}
}
}
// Before we acquire the lock again all sorts of things can happen:
// some waiters may remove themselves from the list and new waiters
// may be added. This is fine since at worst we will unnecessarily
// wake up waiters which will then queue themselves again.
wakers.wake_all();
// Acquire the lock again.
tail = self.tail.lock();
}
wakers.wake_all();
}
} diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index 42cde81d..f46e5a20 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -865,7 +865,7 @@ impl<'a, T> WaitersList<'a, T> {
/// Removes the last element from the guarded list. Modifying this list
/// requires an exclusive access to the main list in `Notify`.
- fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
+ fn pop_back(&mut self) -> Option<NonNull<Waiter>> {
let result = self.list.pop_back();
if result.is_none() {
// Save information about emptiness to avoid waiting for lock
@@ -895,8 +895,12 @@ impl<T> Shared<T> {
let mut wakers = WakeList::new();
'outer: loop {
+
+ // Release the lock before waking.
+ drop(tail);
+
while wakers.can_push() {
- match list.pop_back_locked(&mut tail) {
+ match list.pop_back() {
Some(mut waiter) => {
// Safety: `tail` lock is still held.
let waiter = unsafe { waiter.as_mut() };
@@ -914,10 +918,7 @@ impl<T> Shared<T> {
}
}
- // Release the lock before waking.
- drop(tail);
-
- // Before we acquire the lock again all sorts of things can happen:
+ // Before we acquire the lock again all sorts of things can happen:
// some waiters may remove themselves from the list and new waiters
// may be added. This is fine since at worst we will unnecessarily
// wake up waiters which will then queue themselves again.
@@ -928,9 +929,6 @@ impl<T> Shared<T> {
tail = self.tail.lock();
}
- // Release the lock before waking.
- drop(tail);
-
wakers.wake_all();
}
}
@@ -1512,3 +1510,4 @@ mod tests {
assert_eq!(sender.receiver_count(), 2);
}
}
+ |
No, we can't do that. it is important that we hold the lock when calling |
You are right, the lock should be held while doing the loop. The above code may cause data race. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Upgrade the version of tokio we depend on to version 1.32, to get the version with tokio-rs/tokio#5925, my fix for a performance issue in `tokio::sync::broadcast`. We use this to notify workers when channels are removed from the channel coordinator, so we want this fix to improve the performance of that process. Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
Upgrade the version of tokio we depend on to version 1.32, to get the version with tokio-rs/tokio#5925, my fix for a performance issue in `tokio::sync::broadcast`. We use this to notify workers when channels are removed from the channel coordinator, so we want this fix to improve the performance of that process. Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
Upgrade the version of tokio we depend on to version 1.32, to get the version with tokio-rs/tokio#5925, my fix for a performance issue in `tokio::sync::broadcast`. We use this to notify workers when channels are removed from the channel coordinator, so we want this fix to improve the performance of that process. Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
Upgrade the version of tokio we depend on to version 1.32, to get the version with tokio-rs/tokio#5925, my fix for a performance issue in `tokio::sync::broadcast`. We use this to notify workers when channels are removed from the channel coordinator, so we want this fix to improve the performance of that process. Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
Upgrade the version of tokio we depend on to version 1.32, to get the version with tokio-rs/tokio#5925, my fix for a performance issue in `tokio::sync::broadcast`. We use this to notify workers when channels are removed from the channel coordinator, so we want this fix to improve the performance of that process. Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5883 Tested-by: Buildkite CI Reviewed-by: Luke Osborne <[email protected]>
Upgrade the version of tokio we depend on to version 1.32, to get the version with tokio-rs/tokio#5925, my fix for a performance issue in `tokio::sync::broadcast`. We use this to notify workers when channels are removed from the channel coordinator, so we want this fix to improve the performance of that process. Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5883 Tested-by: Buildkite CI Reviewed-by: Luke Osborne <[email protected]>
Motivation
Fix #5923
Solution
Unboundedly looping within
notify_rx
as long as there are still available wakers causes a quadratic slowdown as receivers which are looping receiving from the channel are added. Instead of continually waiting for new wakers, this commit modifiesnotify_rx
to stop trying to wake wakers once we've notified a number of wakers greater than or equal to whatever the number of active wakers was when we started notifying.