Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed-up waker by using uninitialized array #4055

Merged
merged 16 commits into from
Aug 25, 2021
27 changes: 8 additions & 19 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;
use crate::util::WakeList;

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
Expand Down Expand Up @@ -212,10 +213,7 @@ impl ScheduledIo {
}

fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr = 0;
let mut wakers = WakeList::new();
Comment on lines 215 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a thought: Are we not protecting against panics here in the wrong way? Should we really be panicking and not waking the other wakers just because someone gave us a waker that emitted a panic?

Maybe we should just catch all panics that happen when calling wake and ignore them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the typical scenario of waker panic? Does it invoke polling directly? I'm not familiar with all internals, but it seems like there should be some validation of the solution. Writing the proper test cases with panicking would definitely help.

Copy link
Contributor

@Darksonn Darksonn Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These wakers are user supplied and could run literally any code in the wake call. That said, a well behaved waker should never panic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we really be panicking and not waking the other wakers just because someone gave us a waker that emitted a panic?

Hmm, that's a good point. If one waker panics, failing to wake the others could result in those tasks never being notified.

On the other hand, what's the overhead of adding a catch_unwind in this fairly hot loop? Is that worth introducing to handle a case which can only happen if a user-supplied waker is not "well-behaved"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still have the destructor call wake on the others without a catch_unwind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still have the destructor call wake on the others without a catch_unwind.

A destructor panicing would be even worse, since it could cause a double panic, which would result into an abort

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't user-supplied waker's Drop implementation panic as well? This will cause double panic in the Drop implementation even without waking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the wakers panic in the destructor, you can get a double abort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's absolutely critical to not panic in any of the waker functions. The alternative approach is not to try to fix the erroneous implementation, but to forcefully abort the execution


let mut waiters = self.waiters.lock();

Expand All @@ -224,32 +222,29 @@ impl ScheduledIo {
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers[curr] = Some(waker);
curr += 1;
wakers.push(waker);
}
}

// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers[curr] = Some(waker);
curr += 1;
wakers.push(waker);
}
}

#[cfg(feature = "net")]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));

while curr < NUM_WAKERS {
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers[curr] = Some(waker);
curr += 1;
wakers.push(waker);
}
}
None => {
Expand All @@ -260,11 +255,7 @@ impl ScheduledIo {

drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}

curr = 0;
wakers.wake_all();

// Acquire the lock again.
waiters = self.waiters.lock();
Expand All @@ -273,9 +264,7 @@ impl ScheduledIo {
// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}
wakers.wake_all();
}

pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ cfg_rt! {

mod vec_deque_cell;
pub(crate) use vec_deque_cell::VecDequeCell;

mod wake_list;
pub(crate) use wake_list::WakeList;
}

cfg_rt_multi_thread! {
Expand Down
44 changes: 44 additions & 0 deletions tokio/src/util/wake_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use core::mem::{self, MaybeUninit};
use std::task::Waker;

const NUM_WAKERS: usize = 32;

pub(crate) struct WakeList {
inner: [MaybeUninit<Waker>; NUM_WAKERS],
curr: usize,
}

impl WakeList {
pub(crate) fn new() -> Self {
Self {
inner: unsafe { MaybeUninit::uninit().assume_init() },
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
curr: 0,
}
}

pub(crate) fn can_push(&self) -> bool {
self.curr < NUM_WAKERS - 1
}

pub(crate) fn push(&mut self, val: Waker) {
debug_assert!(self.can_push());

self.inner[self.curr] = MaybeUninit::new(val);
self.curr += 1;
}

pub(crate) fn wake_all(&mut self) {
for waker in &mut self.inner[..self.curr] {
unsafe { mem::replace(waker, MaybeUninit::uninit()).assume_init() }.wake()
}
self.curr = 0;
glebpom marked this conversation as resolved.
Show resolved Hide resolved
glebpom marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Drop for WakeList {
fn drop(&mut self) {
for waker in &mut self.inner[..self.curr] {
mem::drop(unsafe { mem::replace(waker, MaybeUninit::uninit()).assume_init() });
}
glebpom marked this conversation as resolved.
Show resolved Hide resolved
}
}