Skip to content

Commit

Permalink
add shutdown flag for OwnedTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang committed Sep 28, 2023
1 parent ca60be6 commit e7a3d23
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
drop(task);
}

assert!(handle.shared.owned.is_closed());
assert!(handle.shared.owned.is_shutdown());
assert!(handle.shared.owned.is_empty());

// Submit metrics
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ impl Handle {
return;
}

debug_assert!(self.shared.owned.is_closed());
debug_assert!(self.shared.owned.is_shutdown());
debug_assert!(self.shared.owned.is_empty());

for mut core in cores.drain(..) {
Expand Down
18 changes: 11 additions & 7 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ fn get_next_id() -> NonZeroU32 {
pub(crate) struct OwnedTasks<S: 'static> {
lists: Box<[Mutex<ListSement<S>>]>,
pub(crate) id: NonZeroU32,
closed: AtomicBool,
closing: AtomicBool,
shutdown: AtomicBool,
pub(crate) segment_size: u32,
segment_mask: u32,
count: AtomicUsize,
Expand Down Expand Up @@ -73,7 +74,8 @@ impl<S: 'static> OwnedTasks<S> {
}
Self {
lists: lists.into_boxed_slice(),
closed: AtomicBool::new(false),
closing: AtomicBool::new(false),
shutdown: AtomicBool::new(false),
id: get_next_id(),
segment_size,
segment_mask,
Expand Down Expand Up @@ -116,9 +118,9 @@ impl<S: 'static> OwnedTasks<S> {
task.header().set_owner(self.id, index as u32);
}

// check close flag,
// check closing flag, if it is true, we are closing the OwnedTasks
// it must be checked in the lock, for ensuring all tasks will shutdown after OwnedTasks has been closed
if self.closed.load(Ordering::Acquire) {
if self.closing.load(Ordering::Acquire) {
drop(lock);
task.shutdown();
return None;
Expand Down Expand Up @@ -153,7 +155,7 @@ impl<S: 'static> OwnedTasks<S> {
where
S: Schedule,
{
self.closed.store(true, Ordering::Release);
self.closing.store(true, Ordering::Release);
for i in start..self.segment_size as usize + start {
loop {
let (mut lock, _) = self.segment_inner(i);
Expand All @@ -167,6 +169,8 @@ impl<S: 'static> OwnedTasks<S> {
};
}
}
// we has shut down all tasks
self.shutdown.store(true, Ordering::Release);
}

pub(crate) fn active_tasks_count(&self) -> usize {
Expand Down Expand Up @@ -195,8 +199,8 @@ impl<S: 'static> OwnedTasks<S> {
(self.lists[index].lock(), index)
}

pub(crate) fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}

pub(crate) fn is_empty(&self) -> bool {
Expand Down

0 comments on commit e7a3d23

Please sign in to comment.