diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index e8cff36b1cf..cb31e0fe887 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -264,7 +264,7 @@ fn shutdown2(mut core: Box, handle: &Handle) -> Box { drop(task); } - assert!(handle.shared.owned.is_closed()); + assert!(handle.shared.owned.is_shutdown()); assert!(handle.shared.owned.is_empty()); // Submit metrics @@ -547,10 +547,10 @@ cfg_metrics! { } cfg_unstable! { - use std::num::NonZeroU32; + use std::num::NonZeroU64; impl Handle { - pub(crate) fn owned_id(&self) -> NonZeroU32 { + pub(crate) fn owned_id(&self) -> NonZeroU64 { self.shared.owned.id } } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 957673d749f..568eb80af8b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -60,10 +60,10 @@ impl Handle { } cfg_unstable! { - use std::num::NonZeroU32; + use std::num::NonZeroU64; impl Handle { - pub(crate) fn owned_id(&self) -> NonZeroU32 { + pub(crate) fn owned_id(&self) -> NonZeroU64 { self.shared.owned.id } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 6e55f5df215..b94d1c08ba1 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1160,7 +1160,7 @@ impl Handle { return; } - debug_assert!(self.shared.owned.is_closed()); + debug_assert!(self.shared.owned.is_shutdown()); debug_assert!(self.shared.owned.is_empty()); for mut core in cores.drain(..) { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs index b7288fe2302..d746bca1a18 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs @@ -59,10 +59,10 @@ impl Handle { } cfg_unstable! { - use std::num::NonZeroU32; + use std::num::NonZeroU64; impl Handle { - pub(crate) fn owned_id(&self) -> NonZeroU32 { + pub(crate) fn owned_id(&self) -> NonZeroU64 { self.shared.owned.id } } diff --git a/tokio/src/runtime/task/id.rs b/tokio/src/runtime/task/id.rs index dd5ae504582..82c8a7e7e90 100644 --- a/tokio/src/runtime/task/id.rs +++ b/tokio/src/runtime/task/id.rs @@ -74,11 +74,22 @@ impl fmt::Display for Id { impl Id { pub(crate) fn next() -> Self { - use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; + use crate::loom::sync::atomic::Ordering::Relaxed; + use crate::loom::sync::atomic::StaticAtomicU64; - static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + #[cfg(all(test, loom))] + { + crate::loom::lazy_static! { + static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + } + Self(NEXT_ID.fetch_add(1, Relaxed)) + } - Self(NEXT_ID.fetch_add(1, Relaxed)) + #[cfg(not(all(test, loom)))] + { + static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + Self(NEXT_ID.fetch_add(1, Relaxed)) + } } pub(crate) fn as_u64(&self) -> u64 { diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 5b44f5c6e9b..023867066da 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -8,10 +8,10 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Mutex, MutexGuard}; use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task}; use crate::util::linked_list::{Link, LinkedList}; +use std::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::{AtomicBool, Ordering}; use std::marker::PhantomData; @@ -61,7 +61,8 @@ cfg_not_has_atomic_u64! { pub(crate) struct OwnedTasks { lists: Box<[Mutex>]>, pub(crate) id: NonZeroU64, - closed: AtomicBool, + closing: AtomicBool, + shutdown: AtomicBool, pub(crate) segment_size: u32, segment_mask: u32, count: AtomicUsize, @@ -94,7 +95,8 @@ impl OwnedTasks { } Self { lists: lists.into_boxed_slice(), - closed: AtomicBool::new(false), + closing: AtomicBool::new(false), + shutdown: AtomicBool::new(false), id: get_next_id(), segment_size, segment_mask, @@ -144,7 +146,7 @@ impl OwnedTasks { // check close flag, // it must be checked in the lock, for ensuring all tasks will shutdown after OwnedTasks has been closed - if self.closed.load(Ordering::Acquire) { + if self.closing.load(Ordering::Acquire) { drop(lock); task.shutdown(); return None; @@ -178,7 +180,7 @@ impl OwnedTasks { where S: Schedule, { - self.closed.store(true, Ordering::Release); + self.closing.store(true, Ordering::Release); for i in start..self.segment_size as usize + start { loop { let mut lock = self.segment_inner(i); @@ -192,6 +194,8 @@ impl OwnedTasks { }; } } + // we have shut down all tasks + self.shutdown.store(true, Ordering::Release) } pub(crate) fn active_tasks_count(&self) -> usize { @@ -226,8 +230,8 @@ impl OwnedTasks { self.lists[id & (self.segment_mask) as usize].lock() } - pub(crate) fn is_closed(&self) -> bool { - self.closed.load(Ordering::Acquire) + pub(crate) fn is_shutdown(&self) -> bool { + self.shutdown.load(Ordering::Acquire) } pub(crate) fn is_empty(&self) -> bool { diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 8db5aadb901..8eafb48ef89 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -315,7 +315,7 @@ impl Runtime { } drop(core); - assert!(self.0.owned.is_closed()); + assert!(self.0.owned.is_shutdown()); assert!(self.0.owned.is_empty()); } }