From b5cd62ea64e318b7e2764a0040dbde77039ed9e3 Mon Sep 17 00:00:00 2001 From: Nur Date: Sun, 10 Nov 2024 19:00:22 +0600 Subject: [PATCH 1/4] io: remove redundant check --- tokio/src/runtime/io/scheduled_io.rs | 86 +++++++++------------------- 1 file changed, 28 insertions(+), 58 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index ee6977c00e7..77ef636b877 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -206,43 +206,24 @@ impl ScheduledIo { /// specific tick. /// - `f`: a closure returning a new readiness value given the previous /// readiness. - pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) { - let mut current = self.readiness.load(Acquire); - - // If the io driver is shut down, then you are only allowed to clear readiness. - debug_assert!(SHUTDOWN.unpack(current) == 0 || matches!(tick, Tick::Clear(_))); - - loop { - // Mask out the tick bits so that the modifying function doesn't see - // them. - let current_readiness = Ready::from_usize(current); - let new = f(current_readiness); - - let new_tick = match tick { - Tick::Set => { - let current = TICK.unpack(current); - current.wrapping_add(1) % (TICK.max_value() + 1) - } - Tick::Clear(t) => { - if TICK.unpack(current) as u8 != t { - // Trying to clear readiness with an old event! - return; - } - - t as usize - } + pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) { + let update = |curr| { + // If the io driver is shut down, then you are only allowed to clear readiness. + debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_))); + + const MAX_TICK: usize = TICK.max_value() + 1; + let tick = TICK.unpack(curr); + + let new_tick = match tick_op { + // Trying to clear readiness with an old event! + Tick::Clear(t) if tick as u8 != t => return None, + Tick::Clear(t) => t as usize, + Tick::Set => tick.wrapping_add(1) % MAX_TICK, }; - let next = TICK.pack(new_tick, new.as_usize()); - - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => return, - // we lost the race, retry! - Err(actual) => current = actual, - } - } + + Some(TICK.pack(new_tick, f(Ready::from_usize(curr)).as_usize())) + }; + let _ = self.readiness.fetch_update(AcqRel, Acquire, update); } /// Notifies all pending waiters that have registered interest in `ready`. @@ -312,7 +293,7 @@ impl ScheduledIo { ReadyEvent { tick: TICK.unpack(curr) as u8, - ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + ready: interest.mask() & Ready::from_usize(curr), is_shutdown: SHUTDOWN.unpack(curr) != 0, } } @@ -329,34 +310,28 @@ impl ScheduledIo { ) -> Poll { let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let ready = direction.mask() & Ready::from_usize(curr); let is_shutdown = SHUTDOWN.unpack(curr) != 0; if ready.is_empty() && !is_shutdown { // Update the task info let mut waiters = self.waiters.lock(); - let slot = match direction { + let waker = match direction { Direction::Read => &mut waiters.reader, Direction::Write => &mut waiters.writer, }; // Avoid cloning the waker if one is already stored that matches the // current task. - match slot { - Some(existing) => { - if !existing.will_wake(cx.waker()) { - existing.clone_from(cx.waker()); - } - } - None => { - *slot = Some(cx.waker().clone()); - } + match waker { + Some(waker) => waker.clone_from(cx.waker()), + None => *waker = Some(cx.waker().clone()), } // Try again, in case the readiness was changed while we were // taking the waiters lock let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let ready = direction.mask() & Ready::from_usize(curr); let is_shutdown = SHUTDOWN.unpack(curr) != 0; if is_shutdown { Poll::Ready(ReadyEvent { @@ -465,12 +440,11 @@ impl Future for Readiness<'_> { State::Init => { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); let is_shutdown = SHUTDOWN.unpack(curr) != 0; // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; - let ready = ready.intersection(interest); + let ready = Ready::from_usize(curr).intersection(interest); if !ready.is_empty() || is_shutdown { // Currently ready! @@ -487,7 +461,7 @@ impl Future for Readiness<'_> { let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let mut ready = Ready::from_usize(READINESS.unpack(curr)); + let mut ready = Ready::from_usize(curr); let is_shutdown = SHUTDOWN.unpack(curr) != 0; if is_shutdown { @@ -538,10 +512,7 @@ impl Future for Readiness<'_> { *state = State::Done; } else { // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); - } - + w.waker.as_mut().unwrap().clone_from(cx.waker()); return Poll::Pending; } @@ -566,8 +537,7 @@ impl Future for Readiness<'_> { // The readiness state could have been cleared in the meantime, // but we allow the returned ready set to be empty. - let curr_ready = Ready::from_usize(READINESS.unpack(curr)); - let ready = curr_ready.intersection(w.interest); + let ready = Ready::from_usize(curr).intersection(w.interest); return Poll::Ready(ReadyEvent { tick, From 61bf1b8ee64230163427b9707c89984575595f06 Mon Sep 17 00:00:00 2001 From: Nur Date: Tue, 12 Nov 2024 22:02:57 +0600 Subject: [PATCH 2/4] added bit unpacking from readiness --- tokio/src/runtime/io/scheduled_io.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index 77ef636b877..975f6234429 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -221,7 +221,8 @@ impl ScheduledIo { Tick::Set => tick.wrapping_add(1) % MAX_TICK, }; - Some(TICK.pack(new_tick, f(Ready::from_usize(curr)).as_usize())) + let ready = Ready::from_usize(READINESS.unpack(curr)); + Some(TICK.pack(new_tick, f(ready).as_usize())) }; let _ = self.readiness.fetch_update(AcqRel, Acquire, update); } @@ -293,7 +294,7 @@ impl ScheduledIo { ReadyEvent { tick: TICK.unpack(curr) as u8, - ready: interest.mask() & Ready::from_usize(curr), + ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), is_shutdown: SHUTDOWN.unpack(curr) != 0, } } @@ -310,7 +311,7 @@ impl ScheduledIo { ) -> Poll { let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(curr); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); let is_shutdown = SHUTDOWN.unpack(curr) != 0; if ready.is_empty() && !is_shutdown { @@ -331,7 +332,7 @@ impl ScheduledIo { // Try again, in case the readiness was changed while we were // taking the waiters lock let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(curr); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); let is_shutdown = SHUTDOWN.unpack(curr) != 0; if is_shutdown { Poll::Ready(ReadyEvent { @@ -444,7 +445,7 @@ impl Future for Readiness<'_> { // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; - let ready = Ready::from_usize(curr).intersection(interest); + let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest); if !ready.is_empty() || is_shutdown { // Currently ready! @@ -461,7 +462,7 @@ impl Future for Readiness<'_> { let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let mut ready = Ready::from_usize(curr); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); let is_shutdown = SHUTDOWN.unpack(curr) != 0; if is_shutdown { @@ -537,7 +538,7 @@ impl Future for Readiness<'_> { // The readiness state could have been cleared in the meantime, // but we allow the returned ready set to be empty. - let ready = Ready::from_usize(curr).intersection(w.interest); + let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest); return Poll::Ready(ReadyEvent { tick, From 603cf98bc72e70681ec18ee15b024f7bae9404c1 Mon Sep 17 00:00:00 2001 From: Nur Date: Wed, 13 Nov 2024 20:05:33 +0600 Subject: [PATCH 3/4] Update tokio/src/runtime/io/scheduled_io.rs Co-authored-by: Alice Ryhl --- tokio/src/runtime/io/scheduled_io.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index 975f6234429..ec41134aad9 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -224,7 +224,9 @@ impl ScheduledIo { let ready = Ready::from_usize(READINESS.unpack(curr)); Some(TICK.pack(new_tick, f(ready).as_usize())) }; - let _ = self.readiness.fetch_update(AcqRel, Acquire, update); + let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { + ... + }); } /// Notifies all pending waiters that have registered interest in `ready`. From 0126d4ae6de418f0887ade0e04c33d1f8d62512f Mon Sep 17 00:00:00 2001 From: Nur Date: Wed, 13 Nov 2024 20:11:25 +0600 Subject: [PATCH 4/4] io: format code --- tokio/src/runtime/io/scheduled_io.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index ec41134aad9..af57caed460 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -207,7 +207,7 @@ impl ScheduledIo { /// - `f`: a closure returning a new readiness value given the previous /// readiness. pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) { - let update = |curr| { + let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { // If the io driver is shut down, then you are only allowed to clear readiness. debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_))); @@ -220,12 +220,8 @@ impl ScheduledIo { Tick::Clear(t) => t as usize, Tick::Set => tick.wrapping_add(1) % MAX_TICK, }; - let ready = Ready::from_usize(READINESS.unpack(curr)); Some(TICK.pack(new_tick, f(ready).as_usize())) - }; - let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { - ... }); }