Skip to content

Commit

Permalink
refactor: use local_staic in loom
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang committed Sep 28, 2023
1 parent 63a7679 commit b2010d7
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 19 deletions.
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
drop(task);
}

assert!(handle.shared.owned.is_closed());
assert!(handle.shared.owned.is_shutdown());
assert!(handle.shared.owned.is_empty());

// Submit metrics
Expand Down Expand Up @@ -547,10 +547,10 @@ cfg_metrics! {
}

cfg_unstable! {
use std::num::NonZeroU32;
use std::num::NonZeroU64;

impl Handle {
pub(crate) fn owned_id(&self) -> NonZeroU32 {
pub(crate) fn owned_id(&self) -> NonZeroU64 {
self.shared.owned.id
}
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ impl Handle {
}

cfg_unstable! {
use std::num::NonZeroU32;
use std::num::NonZeroU64;

impl Handle {
pub(crate) fn owned_id(&self) -> NonZeroU32 {
pub(crate) fn owned_id(&self) -> NonZeroU64 {
self.shared.owned.id
}
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ impl Handle {
return;
}

debug_assert!(self.shared.owned.is_closed());
debug_assert!(self.shared.owned.is_shutdown());
debug_assert!(self.shared.owned.is_empty());

for mut core in cores.drain(..) {
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ impl Handle {
}

cfg_unstable! {
use std::num::NonZeroU32;
use std::num::NonZeroU64;

impl Handle {
pub(crate) fn owned_id(&self) -> NonZeroU32 {
pub(crate) fn owned_id(&self) -> NonZeroU64 {
self.shared.owned.id
}
}
Expand Down
17 changes: 14 additions & 3 deletions tokio/src/runtime/task/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,22 @@ impl fmt::Display for Id {

impl Id {
pub(crate) fn next() -> Self {
use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::atomic::StaticAtomicU64;

static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
#[cfg(all(test, loom))]
{
crate::loom::lazy_static! {
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
}
Self(NEXT_ID.fetch_add(1, Relaxed))
}

Self(NEXT_ID.fetch_add(1, Relaxed))
#[cfg(not(all(test, loom)))]
{
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Relaxed))
}
}

pub(crate) fn as_u64(&self) -> u64 {
Expand Down
18 changes: 11 additions & 7 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};
use std::sync::atomic::AtomicUsize;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use std::marker::PhantomData;
Expand Down Expand Up @@ -61,7 +61,8 @@ cfg_not_has_atomic_u64! {
pub(crate) struct OwnedTasks<S: 'static> {
lists: Box<[Mutex<ListSement<S>>]>,
pub(crate) id: NonZeroU64,
closed: AtomicBool,
closing: AtomicBool,
shutdown: AtomicBool,
pub(crate) segment_size: u32,
segment_mask: u32,
count: AtomicUsize,
Expand Down Expand Up @@ -94,7 +95,8 @@ impl<S: 'static> OwnedTasks<S> {
}
Self {
lists: lists.into_boxed_slice(),
closed: AtomicBool::new(false),
closing: AtomicBool::new(false),
shutdown: AtomicBool::new(false),
id: get_next_id(),
segment_size,
segment_mask,
Expand Down Expand Up @@ -144,7 +146,7 @@ impl<S: 'static> OwnedTasks<S> {

// check close flag,
// it must be checked in the lock, for ensuring all tasks will shutdown after OwnedTasks has been closed
if self.closed.load(Ordering::Acquire) {
if self.closing.load(Ordering::Acquire) {
drop(lock);
task.shutdown();
return None;
Expand Down Expand Up @@ -178,7 +180,7 @@ impl<S: 'static> OwnedTasks<S> {
where
S: Schedule,
{
self.closed.store(true, Ordering::Release);
self.closing.store(true, Ordering::Release);
for i in start..self.segment_size as usize + start {
loop {
let mut lock = self.segment_inner(i);
Expand All @@ -192,6 +194,8 @@ impl<S: 'static> OwnedTasks<S> {
};
}
}
// we have shut down all tasks
self.shutdown.store(true, Ordering::Release)
}

pub(crate) fn active_tasks_count(&self) -> usize {
Expand Down Expand Up @@ -226,8 +230,8 @@ impl<S: 'static> OwnedTasks<S> {
self.lists[id & (self.segment_mask) as usize].lock()
}

pub(crate) fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}

pub(crate) fn is_empty(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/tests/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Runtime {
}

drop(core);
assert!(self.0.owned.is_closed());
assert!(self.0.owned.is_shutdown());
assert!(self.0.owned.is_empty());
}
}
Expand Down

0 comments on commit b2010d7

Please sign in to comment.