Skip to content

Commit

Permalink
sync: use WakeList in Notify and batch_semaphore (#4071)
Browse files Browse the repository at this point in the history
## Motivation

PR #4055 added a new `WakeList` type, to manage a potentially
uninitialized array when waking batches of wakers. This has the
advantage of not initializing a bunch of empty `Option`s when only a
small number of tasks are being woken, potentially improving performance
in these cases.

Currently, `WakeList` is used only in the IO driver. However,
`tokio::sync` contains some code that's almost identical to the code in
the IO driver that was replaced with `WakeList`, so we can apply the
same optimizations there.

## Solution

This branch changes `tokio::sync::Notify` and
`tokio::sync::batch_semaphore::Semaphore` to use `WakeList` when waking
batches of wakers. This was a pretty straightforward drop-in
replacement.

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Aug 26, 2021
1 parent 80bda3b commit 1e2e38b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
16 changes: 9 additions & 7 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::util::linked_list::{self, LinkedList};
use crate::util::WakeList;

use std::future::Future;
use std::marker::PhantomPinned;
Expand Down Expand Up @@ -239,12 +240,12 @@ impl Semaphore {
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
let mut wakers: [Option<Waker>; 8] = Default::default();
let mut wakers = WakeList::new();
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
'inner: for slot in &mut wakers[..] {
'inner: while wakers.can_push() {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
Expand All @@ -260,7 +261,11 @@ impl Semaphore {
}
};
let mut waiter = waiters.queue.pop_back().unwrap();
*slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
if let Some(waker) =
unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
{
wakers.push(waker);
}
}

if rem > 0 && is_empty {
Expand All @@ -283,10 +288,7 @@ impl Semaphore {

drop(waiters); // release the lock

wakers
.iter_mut()
.filter_map(Option::take)
.for_each(Waker::wake);
wakers.wake_all();
}

assert_eq!(rem, 0);
Expand Down
21 changes: 6 additions & 15 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
use crate::util::WakeList;

use std::cell::UnsafeCell;
use std::future::Future;
Expand Down Expand Up @@ -391,10 +392,7 @@ impl Notify {
/// }
/// ```
pub fn notify_waiters(&self) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr_waker = 0;
let mut wakers = WakeList::new();

// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();
Expand All @@ -414,7 +412,7 @@ impl Notify {
// concurrently change, as holding the lock is required to
// transition **out** of `WAITING`.
'outer: loop {
while curr_waker < NUM_WAKERS {
while wakers.can_push() {
match waiters.pop_back() {
Some(mut waiter) => {
// Safety: `waiters` lock is still held.
Expand All @@ -425,8 +423,7 @@ impl Notify {
waiter.notified = Some(NotificationType::AllWaiters);

if let Some(waker) = waiter.waker.take() {
wakers[curr_waker] = Some(waker);
curr_waker += 1;
wakers.push(waker);
}
}
None => {
Expand All @@ -437,11 +434,7 @@ impl Notify {

drop(waiters);

for waker in wakers.iter_mut().take(curr_waker) {
waker.take().unwrap().wake();
}

curr_waker = 0;
wakers.wake_all();

// Acquire the lock again.
waiters = self.waiters.lock();
Expand All @@ -456,9 +449,7 @@ impl Notify {
// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr_waker) {
waker.take().unwrap().wake();
}
wakers.wake_all();
}
}

Expand Down
26 changes: 23 additions & 3 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
cfg_io_driver! {
pub(crate) mod bit;
pub(crate) mod slab;

mod wake_list;
pub(crate) use wake_list::WakeList;
}

#[cfg(any(
// io driver uses `WakeList` directly
feature = "net",
feature = "process",
// `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`.
feature = "sync",
// `fs` uses `batch_semaphore`, which requires `WakeList`.
feature = "fs",
// rt and signal use `Notify`, which requires `WakeList`.
feature = "rt",
feature = "signal",
))]
mod wake_list;
#[cfg(any(
feature = "net",
feature = "process",
feature = "sync",
feature = "fs",
feature = "rt",
feature = "signal",
))]
pub(crate) use wake_list::WakeList;

#[cfg(any(
feature = "fs",
feature = "net",
Expand Down

0 comments on commit 1e2e38b

Please sign in to comment.