Skip to content

Commit

Permalink
rt: reduce no-op wakeups in the multi-threaded scheduler (#4383)
Browse files Browse the repository at this point in the history
This patch reduces the number of times worker threads wake up without having
work to do in the multi-threaded scheduler. Unnecessary wake-ups are expensive
and slow down the scheduler. I have observed this change reduce no-op wakes
by up to 50%.

The multi-threaded scheduler is work-stealing. When a worker has tasks to process,
and other workers are idle (parked), these idle workers must be unparked so that
they can steal work from the busy worker. However, unparking threads is expensive,
so there is an optimization that avoids unparking a worker if there already exists
workers in a "searching" state (the worker is unparked and looking for work). This
works pretty well, but transitioning from 1 "searching" worker to 0 searching workers
introduces a race condition where a thread unpark can be lost:

* thread 1: last searching worker about to exit searching state
* thread 2: needs to unpark a thread, but skip because there is a searching worker.
* thread 1: exits searching state w/o seeing thread 2's work.

Because this should be a rare condition, Tokio solves this by always unparking a
new worker when the current worker:

* is the last searching worker
* is transitioning out of searching
* has work to process.

When the newly unparked worker wakes, if the race condition described above
happened, "thread 2"'s work will be found. Otherwise, it will just go back to sleep.

Now we come to the issue at hand. A bug incorrectly set a worker to "searching"
when the I/O driver unparked the thread. In a situation where the scheduler was
only partially under load and is able to operate with 1 active worker, the I/O driver
would unpark the thread when new I/O events are received, incorrectly transition
it to "searching", find new work generated by inbound I/O events, incorrectly
transition itself from the last searcher -> no searchers, and unpark a new thread.
This new thread would wake, find no work and go back to sleep.

Note that, when the scheduler is fully saturated, this change will make no impact
as most workers are always unparked and the optimization to avoid unparking
threads described at the top apply.
  • Loading branch information
carllerche authored Jan 13, 2022
1 parent 16a8404 commit 4eed411
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
16 changes: 10 additions & 6 deletions tokio/src/runtime/thread_pool/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Idle {

// A worker should be woken up, atomically increment the number of
// searching workers as well as the number of unparked workers.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 1);

// Get the worker to unpark
let ret = sleepers.pop();
Expand Down Expand Up @@ -111,19 +111,23 @@ impl Idle {

/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
///
/// Returns `true` if the worker was parked before calling the method.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
let mut sleepers = self.sleepers.lock();

for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
sleepers.swap_remove(index);

// Update the state accordingly while the lock is held.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 0);

return;
return true;
}
}

false
}

/// Returns `true` if `worker_id` is contained in the sleep set.
Expand Down Expand Up @@ -151,8 +155,8 @@ impl State {
State(cell.load(ordering))
}

fn unpark_one(cell: &AtomicUsize) {
cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
}

fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
Expand Down
12 changes: 8 additions & 4 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,9 @@ impl Context {
// Place `park` back in `core`
core.park = Some(park);

// If there are tasks available to steal, notify a worker
if core.run_queue.is_stealable() {
// If there are tasks available to steal, but this worker is not
// looking for tasks to steal, notify another worker.
if !core.is_searching && core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
}

Expand Down Expand Up @@ -621,8 +622,11 @@ impl Core {
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
// When a worker wakes, it should only transition to the "searching"
// state when the wake originates from another worker *or* a new task
// is pushed. We do *not* want the worker to transition to "searching"
// when it wakes when the I/O driver receives new events.
self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
return true;
}

Expand Down

0 comments on commit 4eed411

Please sign in to comment.