Skip to content

Commit

Permalink
runtime: reduce the lock contention in task spawn (#6001)
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang authored Dec 7, 2023
1 parent a0a58d7 commit 3a4aef1
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 142 deletions.
1 change: 1 addition & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
criterion = "0.5.1"
rand = "0.8"
rand_chacha = "0.3"
num_cpus = "1.16.0"

[dev-dependencies]
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
Expand Down
1 change: 0 additions & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,6 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/id.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fmt;
use std::num::NonZeroU64;
use std::num::{NonZeroU32, NonZeroU64};

/// An opaque ID that uniquely identifies a runtime relative to all other currently
/// running runtimes.
Expand Down Expand Up @@ -39,6 +39,12 @@ impl From<NonZeroU64> for Id {
}
}

impl From<NonZeroU32> for Id {
fn from(value: NonZeroU32) -> Self {
Id(value.into())
}
}

impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl CurrentThread {
let handle = Arc::new(Handle {
shared: Shared {
inject: Inject::new(),
owned: OwnedTasks::new(),
owned: OwnedTasks::new(1),
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -248,7 +248,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
handle.shared.owned.close_and_shutdown_all();
handle.shared.owned.close_and_shutdown_all(0);

// Drain local queue
// We already shut down every task, so we just need to drop the task.
Expand Down Expand Up @@ -614,7 +614,7 @@ impl Schedule for Arc<Handle> {
// If `None`, the runtime is shutting down, so there is no need to signal shutdown
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.shared.owned.close_and_shutdown_all();
self.shared.owned.close_and_shutdown_all(0);
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
Expand Down
13 changes: 10 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(size),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
Expand Down Expand Up @@ -548,7 +548,6 @@ impl Context {
}

core.pre_shutdown(&self.worker);

// Signal shutdown
self.worker.handle.shutdown_core(core);
Err(())
Expand Down Expand Up @@ -955,8 +954,16 @@ impl Core {
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Start from a random inner list
let start = self
.rand
.fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
// Signal to all tasks to shut down.
worker.handle.shared.owned.close_and_shutdown_all();
worker
.handle
.shared
.owned
.close_and_shutdown_all(start as usize);

self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]);
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(num_cores),
synced: Mutex::new(Synced {
assigned_cores: (0..num_workers).map(|_| None).collect(),
shutdown_cores: Vec::with_capacity(num_cores),
Expand Down Expand Up @@ -1460,7 +1460,9 @@ impl Shared {
}

pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
self.owned.close_and_shutdown_all();
// Start from a random inner list
let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
self.owned.close_and_shutdown_all(start as usize);

core.stats.submit(&self.worker_metrics[core.index]);

Expand Down
19 changes: 15 additions & 4 deletions tokio/src/runtime/task/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);
pub struct Id(pub(crate) u64);

/// Returns the [`Id`] of the currently running task.
///
Expand Down Expand Up @@ -74,11 +74,22 @@ impl fmt::Display for Id {

impl Id {
pub(crate) fn next() -> Self {
use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::atomic::StaticAtomicU64;

static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
#[cfg(all(test, loom))]
{
crate::loom::lazy_static! {
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
}
Self(NEXT_ID.fetch_add(1, Relaxed))
}

Self(NEXT_ID.fetch_add(1, Relaxed))
#[cfg(not(all(test, loom)))]
{
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Relaxed))
}
}

pub(crate) fn as_u64(&self) -> u64 {
Expand Down
110 changes: 63 additions & 47 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Mutex;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::sharded_list;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use std::marker::PhantomData;
use std::num::NonZeroU64;

Expand All @@ -25,7 +26,7 @@ use std::num::NonZeroU64;
// mixed up runtimes happen to have the same id.

cfg_has_atomic_u64! {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;

static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);

Expand All @@ -40,7 +41,7 @@ cfg_has_atomic_u64! {
}

cfg_not_has_atomic_u64! {
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::AtomicU32;

static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);

Expand All @@ -55,30 +56,30 @@ cfg_not_has_atomic_u64! {
}

pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<CountedOwnedTasksInner<S>>,
list: List<S>,
pub(crate) id: NonZeroU64,
closed: AtomicBool,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

type List<S> = sharded_list::ShardedList<Task<S>, <Task<S> as Link>::Target>;

pub(crate) struct LocalOwnedTasks<S: 'static> {
inner: UnsafeCell<OwnedTasksInner<S>>,
pub(crate) id: NonZeroU64,
_not_send_or_sync: PhantomData<*const ()>,
}

struct OwnedTasksInner<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
pub(crate) fn new(num_cores: usize) -> Self {
let shard_size = Self::gen_shared_list_size(num_cores);
Self {
inner: Mutex::new(CountedOwnedTasksInner {
list: CountedLinkedList::new(),
closed: false,
}),
list: List::new(shard_size),
closed: AtomicBool::new(false),
id: get_next_id(),
}
}
Expand Down Expand Up @@ -112,24 +113,23 @@ impl<S: 'static> OwnedTasks<S> {
task.header().set_owner_id(self.id);
}

let mut lock = self.inner.lock();
if lock.closed {
drop(lock);
drop(notified);
let shard = self.list.lock_shard(&task);
// Check the closed flag in the lock for ensuring all that tasks
// will shut down after the OwnedTasks has been closed.
if self.closed.load(Ordering::Acquire) {
drop(shard);
task.shutdown();
None
} else {
lock.list.push_front(task);
Some(notified)
return None;
}
shard.push(task);
Some(notified)
}

/// Asserts that the given task is owned by this OwnedTasks and convert it to
/// a LocalNotified, giving the thread permission to poll this task.
#[inline]
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
debug_assert_eq!(task.header().get_owner_id(), Some(self.id));

// safety: All tasks bound to this OwnedTasks are Send, so it is safe
// to poll it on this thread no matter what thread we are on.
LocalNotified {
Expand All @@ -140,34 +140,34 @@ impl<S: 'static> OwnedTasks<S> {

/// Shuts down all tasks in the collection. This call also closes the
/// collection, preventing new items from being added.
pub(crate) fn close_and_shutdown_all(&self)
///
/// The parameter start determines which shard this method will start at.
/// Using different values for each worker thread reduces contention.
pub(crate) fn close_and_shutdown_all(&self, start: usize)
where
S: Schedule,
{
// The first iteration of the loop was unrolled so it can set the
// closed bool.
let first_task = {
let mut lock = self.inner.lock();
lock.closed = true;
lock.list.pop_back()
};
match first_task {
Some(task) => task.shutdown(),
None => return,
self.closed.store(true, Ordering::Release);
for i in start..self.get_shard_size() + start {
loop {
let task = self.list.pop_back(i);
match task {
Some(task) => {
task.shutdown();
}
None => break,
}
}
}
}

loop {
let task = match self.inner.lock().list.pop_back() {
Some(task) => task,
None => return,
};

task.shutdown();
}
#[inline]
pub(crate) fn get_shard_size(&self) -> usize {
self.list.shard_size()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
self.list.len()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
Expand All @@ -179,11 +179,27 @@ impl<S: 'static> OwnedTasks<S> {

// safety: We just checked that the provided task is not in some other
// linked list.
unsafe { self.inner.lock().list.remove(task.header_ptr()) }
unsafe { self.list.remove(task.header_ptr()) }
}

pub(crate) fn is_empty(&self) -> bool {
self.inner.lock().list.is_empty()
self.list.is_empty()
}

/// Generates the size of the sharded list based on the number of worker threads.
///
/// The sharded lock design can effectively alleviate
/// lock contention performance problems caused by high concurrency.
///
/// However, as the number of shards increases, the memory continuity between
/// nodes in the intrusive linked list will diminish. Furthermore,
/// the construction time of the sharded list will also increase with a higher number of shards.
///
/// Due to the above reasons, we set a maximum value for the shared list size,
/// denoted as `MAX_SHARED_LIST_SIZE`.
fn gen_shared_list_size(num_cores: usize) -> usize {
const MAX_SHARED_LIST_SIZE: usize = 1 << 16;
usize::min(MAX_SHARED_LIST_SIZE, num_cores.next_power_of_two() * 4)
}
}

Expand All @@ -192,9 +208,9 @@ cfg_taskdump! {
/// Locks the tasks, and calls `f` on an iterator over them.
pub(crate) fn for_each<F>(&self, f: F)
where
F: FnMut(&Task<S>)
F: FnMut(&Task<S>),
{
self.inner.lock().list.for_each(f)
self.list.for_each(f);
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ cfg_taskdump! {

use crate::future::Future;
use crate::util::linked_list;
use crate::util::sharded_list;

use std::marker::PhantomData;
use std::ptr::NonNull;
Expand Down Expand Up @@ -503,3 +504,16 @@ unsafe impl<S> linked_list::Link for Task<S> {
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
}
}

/// # Safety
///
/// The id of a task is never changed after creation of the task, so the return value of
/// `get_shard_id` will not change. (The cast may throw away the upper 32 bits of the task id, but
/// the shard id still won't change from call to call.)
unsafe impl<S> sharded_list::ShardedListItem for Task<S> {
unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize {
// SAFETY: The caller guarantees that `target` points at a valid task.
let task_id = unsafe { Header::get_id(target) };
task_id.0 as usize
}
}
Loading

0 comments on commit 3a4aef1

Please sign in to comment.