From 65670eb5513c2e8e8c16e9c15d7de8095c8cdae0 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Fri, 29 Sep 2023 22:14:41 +0800 Subject: [PATCH] fix: rm segment_size field of OwnedTasks, use method to return it instead --- tokio/src/runtime/scheduler/multi_thread/worker.rs | 2 +- .../src/runtime/scheduler/multi_thread_alt/worker.rs | 2 +- tokio/src/runtime/task/list.rs | 11 +++++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index b94d1c08ba1..776a42a4061 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -957,7 +957,7 @@ impl Core { // Start from a random inner list let start = self .rand - .fastrand_n(worker.handle.shared.owned.segment_size); + .fastrand_n(worker.handle.shared.owned.get_segment_size() as u32); // Signal to all tasks to shut down. worker .handle diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 1533546e807..5f3dc0d0a6f 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1462,7 +1462,7 @@ impl Shared { pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box) { // Start from a random inner list - let start = core.rand.fastrand_n(self.owned.segment_size as u32); + let start = core.rand.fastrand_n(self.owned.get_segment_size() as u32); self.owned.close_and_shutdown_all(start as usize); core.stats.submit(&self.worker_metrics[core.index]); diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 96ea40528d1..d5bf89cd38a 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -63,7 +63,6 @@ pub(crate) struct OwnedTasks { pub(crate) id: NonZeroU64, closing: AtomicBool, shutdown: AtomicBool, - pub(crate) segment_size: u32, segment_mask: u32, count: AtomicUsize, } @@ -98,7 +97,6 @@ impl OwnedTasks { closing: AtomicBool::new(false), shutdown: AtomicBool::new(false), id: get_next_id(), - segment_size, segment_mask, count: AtomicUsize::new(0), } @@ -181,7 +179,7 @@ impl OwnedTasks { S: Schedule, { self.closing.store(true, Ordering::Release); - for i in start..self.segment_size as usize + start { + for i in start..self.get_segment_size() + start { loop { let mut lock = self.segment_inner(i); match lock.pop_back() { @@ -198,6 +196,11 @@ impl OwnedTasks { self.shutdown.store(true, Ordering::Release) } + #[inline] + pub(crate) fn get_segment_size(&self) -> usize { + self.lists.len() + } + pub(crate) fn active_tasks_count(&self) -> usize { self.count.load(Ordering::Relaxed) } @@ -247,7 +250,7 @@ cfg_taskdump! { F: FnMut(&Task), { // while tracing, new tasks are not allowed to add, so we get all locks first - let mut guards = Vec::with_capacity(self.segment_size as usize); + let mut guards = Vec::with_capacity(self.get_segment_size() as usize); for list in self.lists.as_ref() { guards.push(list.lock()); }