diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 2ea63591481..74b97cc481c 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -8,6 +8,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; use std::cell::UnsafeCell; use std::future::Future; @@ -391,10 +392,7 @@ impl Notify { /// } /// ``` pub fn notify_waiters(&self) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option; NUM_WAKERS] = Default::default(); - let mut curr_waker = 0; + let mut wakers = WakeList::new(); // There are waiters, the lock must be acquired to notify. let mut waiters = self.waiters.lock(); @@ -414,7 +412,7 @@ impl Notify { // concurrently change, as holding the lock is required to // transition **out** of `WAITING`. 'outer: loop { - while curr_waker < NUM_WAKERS { + while wakers.can_push() { match waiters.pop_back() { Some(mut waiter) => { // Safety: `waiters` lock is still held. @@ -425,8 +423,7 @@ impl Notify { waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { - wakers[curr_waker] = Some(waker); - curr_waker += 1; + wakers.push(waker); } } None => { @@ -437,11 +434,7 @@ impl Notify { drop(waiters); - for waker in wakers.iter_mut().take(curr_waker) { - waker.take().unwrap().wake(); - } - - curr_waker = 0; + wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); @@ -456,9 +449,7 @@ impl Notify { // Release the lock before notifying drop(waiters); - for waker in wakers.iter_mut().take(curr_waker) { - waker.take().unwrap().wake(); - } + wakers.wake_all(); } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 300e06bc49e..21595557518 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -1,11 +1,13 @@ cfg_io_driver! { pub(crate) mod bit; pub(crate) mod slab; - - mod wake_list; - pub(crate) use wake_list::WakeList; } +#[cfg(any(feature = "io-driver", feature = "sync"))] +mod wake_list; +#[cfg(any(feature = "io-driver", feature = "sync"))] +pub(crate) use wake_list::WakeList; + #[cfg(any( feature = "fs", feature = "net",