Skip to content

Commit

Permalink
sync: update Notify to use WakeList
Browse files Browse the repository at this point in the history
This commit updates `tokio::sync::Notify` to use the `WakeList` type
added in PR #4055. This may improve performance somewhat, as it will
avoid initializing a bunch of empty `Option`s when waking.

I'd like to make similar changes to `BatchSemaphore`, but this is a
somewhat larger change, as the wakers stored in the array are not
`Waker`s but an internal type, and permit assigning operations are
performed prior to waking.
  • Loading branch information
hawkw committed Aug 25, 2021
1 parent 80bda3b commit 0d703b3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 18 deletions.
21 changes: 6 additions & 15 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
use crate::util::WakeList;

use std::cell::UnsafeCell;
use std::future::Future;
Expand Down Expand Up @@ -391,10 +392,7 @@ impl Notify {
/// }
/// ```
pub fn notify_waiters(&self) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr_waker = 0;
let mut wakers = WakeList::new();

// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();
Expand All @@ -414,7 +412,7 @@ impl Notify {
// concurrently change, as holding the lock is required to
// transition **out** of `WAITING`.
'outer: loop {
while curr_waker < NUM_WAKERS {
while wakers.can_push() {
match waiters.pop_back() {
Some(mut waiter) => {
// Safety: `waiters` lock is still held.
Expand All @@ -425,8 +423,7 @@ impl Notify {
waiter.notified = Some(NotificationType::AllWaiters);

if let Some(waker) = waiter.waker.take() {
wakers[curr_waker] = Some(waker);
curr_waker += 1;
wakers.push(waker);
}
}
None => {
Expand All @@ -437,11 +434,7 @@ impl Notify {

drop(waiters);

for waker in wakers.iter_mut().take(curr_waker) {
waker.take().unwrap().wake();
}

curr_waker = 0;
wakers.wake_all();

// Acquire the lock again.
waiters = self.waiters.lock();
Expand All @@ -456,9 +449,7 @@ impl Notify {
// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr_waker) {
waker.take().unwrap().wake();
}
wakers.wake_all();
}
}

Expand Down
8 changes: 5 additions & 3 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
cfg_io_driver! {
pub(crate) mod bit;
pub(crate) mod slab;

mod wake_list;
pub(crate) use wake_list::WakeList;
}

#[cfg(any(feature = "io-driver", feature = "sync"))]
mod wake_list;
#[cfg(any(feature = "io-driver", feature = "sync"))]
pub(crate) use wake_list::WakeList;

#[cfg(any(
feature = "fs",
feature = "net",
Expand Down

0 comments on commit 0d703b3

Please sign in to comment.