Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt(alt): fix a number of concurrency bugs #5907

Merged
merged 11 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ wasm-bindgen-test = "0.3.0"
mio-aio = { version = "0.7.0", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.6", features = ["futures", "checkpoint"] }
loom = { version = "0.7", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
1 change: 1 addition & 0 deletions tokio/src/loom/mocked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) mod sync {
}

#[inline]
#[track_caller]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock().unwrap()
}
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct Builder {
/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u32,

pub(super) local_queue_capacity: usize,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR makes the local queue capacity configurable but does not expose the option. At some point, it might, but right now, it is for tests.


/// When true, the multi-threade scheduler LIFO slot should not be used.
///
/// This option should only be exposed as unstable.
Expand Down Expand Up @@ -297,6 +299,12 @@ impl Builder {
global_queue_interval: None,
event_interval,

#[cfg(not(loom))]
local_queue_capacity: 256,

#[cfg(loom)]
local_queue_capacity: 4,

seed_generator: RngSeedGenerator::new(RngSeed::new()),

#[cfg(tokio_unstable)]
Expand Down Expand Up @@ -1046,6 +1054,14 @@ impl Builder {
}
}

cfg_loom! {
pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
assert!(value.is_power_of_two());
self.local_queue_capacity = value;
self
}
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::scheduler::{self, CurrentThread};
use crate::runtime::{runtime::Scheduler, Config};
Expand Down Expand Up @@ -1074,6 +1090,7 @@ impl Builder {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
Expand Down Expand Up @@ -1224,6 +1241,7 @@ cfg_rt_multi_thread! {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
Expand Down Expand Up @@ -1271,6 +1289,7 @@ cfg_rt_multi_thread! {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#![cfg_attr(any(not(feature = "full"), target_family = "wasm"), allow(dead_code))]
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]
use crate::runtime::Callback;
use crate::util::RngSeedGenerator;

Expand All @@ -9,6 +12,9 @@ pub(crate) struct Config {
/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,

/// How big to make each worker's local queue
pub(crate) local_queue_capacity: usize,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

Expand Down
28 changes: 27 additions & 1 deletion tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

// Eventually, this file will see significant refactoring / cleanup. For now, we
// don't need to worry much about dead code with certain feature permutations.
#![cfg_attr(not(feature = "full"), allow(dead_code))]
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]

use crate::runtime::park::{ParkThread, UnparkThread};

Expand Down Expand Up @@ -58,6 +61,10 @@ impl Driver {
))
}

pub(crate) fn is_enabled(&self) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added so the scheduler can avoid the extra thread if there are no resources to drive.

self.inner.is_enabled()
}

pub(crate) fn park(&mut self, handle: &Handle) {
self.inner.park(handle)
}
Expand Down Expand Up @@ -154,6 +161,13 @@ cfg_io_driver! {
}

impl IoStack {
pub(crate) fn is_enabled(&self) -> bool {
match self {
IoStack::Enabled(..) => true,
IoStack::Disabled(..) => false,
}
}

pub(crate) fn park(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(handle),
Expand Down Expand Up @@ -217,6 +231,11 @@ cfg_not_io_driver! {
pub(crate) fn shutdown(&mut self, _handle: &Handle) {
self.0.shutdown();
}

/// This is not a "real" driver, so it is not considered enabled.
pub(crate) fn is_enabled(&self) -> bool {
false
}
}
}

Expand Down Expand Up @@ -298,6 +317,13 @@ cfg_time! {
}

impl TimeDriver {
pub(crate) fn is_enabled(&self) -> bool {
match self {
TimeDriver::Enabled { .. } => true,
TimeDriver::Disabled(inner) => inner.is_enabled(),
}
}

pub(crate) fn park(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, .. } => driver.park(handle),
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/inject/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ impl<T: 'static> Shared<T> {
pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
use std::cmp;

debug_assert!(n > 0);

// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
let len = self.len.unsync_load();
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/scheduler/inject/synced.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]

use crate::runtime::task;

pub(crate) struct Synced {
Expand Down Expand Up @@ -29,4 +34,8 @@ impl Synced {
// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}

pub(crate) fn is_empty(&self) -> bool {
self.head.is_none()
}
}
28 changes: 13 additions & 15 deletions tokio/src/runtime/scheduler/multi_thread_alt/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ impl Idle {
(idle, synced)
}

pub(super) fn needs_searching(&self) -> bool {
self.needs_searching.load(Acquire)
}

pub(super) fn num_idle(&self, synced: &Synced) -> usize {
#[cfg(not(loom))]
debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
synced.available_cores.len()
}
Expand Down Expand Up @@ -131,13 +136,7 @@ impl Idle {
}

// We need to establish a stronger barrier than with `notify_local`
if self
.num_searching
.compare_exchange(0, 1, AcqRel, Acquire)
.is_err()
{
return;
}
self.num_searching.fetch_add(1, AcqRel);

self.notify_synced(synced, shared);
}
Expand All @@ -158,6 +157,7 @@ impl Idle {
synced.assigned_cores[worker] = Some(core);

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);

// Update the number of sleeping workers
Expand Down Expand Up @@ -221,6 +221,7 @@ impl Idle {
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
#[cfg(not(loom))]
debug_assert_eq!(
synced.idle.available_cores.len(),
self.num_idle.load(Acquire)
Expand Down Expand Up @@ -260,11 +261,11 @@ impl Idle {
// The core should not be searching at this point
debug_assert!(!core.is_searching);

// Check that this isn't the final worker to go idle *and*
// `needs_searching` is set.
debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced.idle) > 1);
// Check that there are no pending tasks in the global queue
debug_assert!(synced.inject.is_empty());

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));

self.idle_map.set(core.index);
Expand Down Expand Up @@ -314,7 +315,7 @@ impl Idle {
}
}

fn transition_worker_to_searching(&self, core: &mut Core) {
pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
core.is_searching = true;
self.num_searching.fetch_add(1, AcqRel);
self.needs_searching.store(false, Release);
Expand All @@ -324,10 +325,7 @@ impl Idle {
///
/// Returns `true` if this is the final searching worker. The caller
/// **must** notify a new worker.
pub(super) fn transition_worker_from_searching(&self, core: &mut Core) -> bool {
debug_assert!(core.is_searching);
core.is_searching = false;

pub(super) fn transition_worker_from_searching(&self) -> bool {
let prev = self.num_searching.fetch_sub(1, AcqRel);
debug_assert!(prev > 0);

Expand Down
Loading