From 51f4f0594c81fe24919055e0c08be374ee54a14b Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Wed, 25 Aug 2021 17:18:19 +0300 Subject: [PATCH] io: speed-up waking by using uninitialized array (#4055) --- tokio/src/io/driver/scheduled_io.rs | 27 +++++------------ tokio/src/util/mod.rs | 3 ++ tokio/src/util/wake_list.rs | 47 +++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 19 deletions(-) create mode 100644 tokio/src/util/wake_list.rs diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 51780107909..a2657203fb5 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; use crate::util::slab::Entry; +use crate::util::WakeList; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; @@ -212,10 +213,7 @@ impl ScheduledIo { } fn wake0(&self, ready: Ready, shutdown: bool) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option; NUM_WAKERS] = Default::default(); - let mut curr = 0; + let mut wakers = WakeList::new(); let mut waiters = self.waiters.lock(); @@ -224,16 +222,14 @@ impl ScheduledIo { // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } // check for AsyncWrite slot if ready.is_writable() { if let Some(waker) = waiters.writer.take() { - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } @@ -241,15 +237,14 @@ impl ScheduledIo { 'outer: loop { let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); - while curr < NUM_WAKERS { + while wakers.can_push() { match iter.next() { Some(waiter) => { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { waiter.is_ready = true; - wakers[curr] = Some(waker); - curr += 1; + wakers.push(waker); } } None => { @@ -260,11 +255,7 @@ impl ScheduledIo { drop(waiters); - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } - - curr = 0; + wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); @@ -273,9 +264,7 @@ impl ScheduledIo { // Release the lock before notifying drop(waiters); - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } + wakers.wake_all(); } pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 9065f50a836..300e06bc49e 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -1,6 +1,9 @@ cfg_io_driver! { pub(crate) mod bit; pub(crate) mod slab; + + mod wake_list; + pub(crate) use wake_list::WakeList; } #[cfg(any( diff --git a/tokio/src/util/wake_list.rs b/tokio/src/util/wake_list.rs new file mode 100644 index 00000000000..2a5268adb56 --- /dev/null +++ b/tokio/src/util/wake_list.rs @@ -0,0 +1,47 @@ +use core::mem::MaybeUninit; +use core::ptr; +use std::task::Waker; + +const NUM_WAKERS: usize = 32; + +pub(crate) struct WakeList { + inner: [MaybeUninit; NUM_WAKERS], + curr: usize, +} + +impl WakeList { + pub(crate) fn new() -> Self { + Self { + inner: unsafe { MaybeUninit::uninit().assume_init() }, + curr: 0, + } + } + + #[inline] + pub(crate) fn can_push(&self) -> bool { + self.curr < NUM_WAKERS + } + + pub(crate) fn push(&mut self, val: Waker) { + debug_assert!(self.can_push()); + + self.inner[self.curr] = MaybeUninit::new(val); + self.curr += 1; + } + + pub(crate) fn wake_all(&mut self) { + assert!(self.curr <= NUM_WAKERS); + while self.curr > 0 { + self.curr -= 1; + let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) }; + waker.wake(); + } + } +} + +impl Drop for WakeList { + fn drop(&mut self) { + let slice = ptr::slice_from_raw_parts_mut(self.inner.as_mut_ptr() as *mut Waker, self.curr); + unsafe { ptr::drop_in_place(slice) }; + } +}