From 8546ff826db8dba1e39b4119ad909fb6cab2492a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 21 Nov 2019 23:28:39 -0800 Subject: [PATCH] runtime: cleanup and add config options (#1807) * runtime: cleanup and add config options This patch finishes the cleanup as part of the transition to Tokio 0.2. A number of changes were made to take advantage of having all Tokio types in a single crate. Also, fixes using Tokio types from `spawn_blocking`. * Many threads, one resource driver Previously, in the threaded scheduler, a resource driver (mio::Poll / timer combo) was created per thread. This was more or less fine, except it required balancing across the available drivers. When using a resource driver from **outside** of the thread pool, balancing is tricky. The change was original done to avoid having a dedicated driver thread. Now, instead of creating many resource drivers, a single resource driver is used. Each scheduler thread will attempt to "lock" the resource driver before parking on it. If the resource driver is already locked, the thread uses a condition variable to park. Contention should remain low as, under load, the scheduler avoids using the drivers. * Add configuration options to enable I/O / time New configuration options are added to `runtime::Builder` to allow enabling I/O and time drivers on a runtime instance basis. This is useful when wanting to create lightweight runtime instances to execute compute only tasks. * Bug fixes The condition variable parker is updated to the same algorithm used in `std`. This is motivated by some potential deadlock cases discovered by `loom`. The basic scheduler is fixed to fairly schedule tasks. `push_front` was accidentally used instead of `push_back`. I/O, time, and spawning now work from within `spawn_blocking` closures. * Misc cleanup The threaded scheduler is no longer generic over `P :Park`. Instead, it is hard coded to a specific parker. Tests, including loom tests, are updated to use `Runtime` directly. This provides greater coverage. The `blocking` module is moved back into `runtime` as all usage is within `runtime` itself. --- tokio-macros/src/lib.rs | 2 + tokio-test/src/io.rs | 2 + tokio-test/src/lib.rs | 7 +- tokio/Cargo.toml | 2 +- tokio/src/fs/mod.rs | 4 +- tokio/src/io/driver/mod.rs | 2 +- tokio/src/io/mod.rs | 4 +- tokio/src/lib.rs | 5 +- tokio/src/loom/std/mod.rs | 4 +- tokio/src/macros/cfg.rs | 8 +- tokio/src/net/addr.rs | 8 +- tokio/src/net/mod.rs | 2 + tokio/src/park/either.rs | 65 ++++ tokio/src/{runtime => }/park/mod.rs | 16 +- tokio/src/park/thread.rs | 316 ++++++++++++++++++ tokio/src/prelude.rs | 2 + tokio/src/runtime/basic_scheduler.rs | 4 +- tokio/src/runtime/blocking.rs | 41 --- tokio/src/runtime/blocking/mod.rs | 63 ++++ tokio/src/{ => runtime}/blocking/pool.rs | 235 ++++++++----- tokio/src/{ => runtime}/blocking/schedule.rs | 0 tokio/src/runtime/blocking/shutdown.rs | 44 +++ tokio/src/{ => runtime}/blocking/task.rs | 0 tokio/src/runtime/builder.rs | 218 ++++++------ tokio/src/runtime/enter.rs | 50 +-- tokio/src/runtime/handle.rs | 41 +-- tokio/src/runtime/io.rs | 34 +- tokio/src/runtime/mod.rs | 33 +- tokio/src/runtime/park.rs | 225 +++++++++++++ tokio/src/runtime/park/thread.rs | 280 ---------------- tokio/src/runtime/shell.rs | 12 +- tokio/src/runtime/spawner.rs | 53 +++ tokio/src/runtime/tests/mock_park.rs | 66 ---- tokio/src/runtime/tests/mod.rs | 3 - tokio/src/runtime/thread_pool/current.rs | 9 +- tokio/src/runtime/thread_pool/mod.rs | 95 ++---- tokio/src/runtime/thread_pool/owned.rs | 21 +- tokio/src/runtime/thread_pool/shared.rs | 32 +- tokio/src/runtime/thread_pool/slice.rs | 76 ++--- tokio/src/runtime/thread_pool/spawner.rs | 7 +- .../runtime/thread_pool/tests/loom_pool.rs | 101 +----- .../runtime/thread_pool/tests/loom_queue.rs | 3 +- tokio/src/runtime/thread_pool/tests/mod.rs | 3 - tokio/src/runtime/thread_pool/tests/pool.rs | 206 ------------ tokio/src/runtime/thread_pool/worker.rs | 233 ++++++------- tokio/src/runtime/time.rs | 37 +- tokio/src/sync/mod.rs | 10 +- tokio/src/task/blocking.rs | 3 +- tokio/src/task/mod.rs | 14 - tokio/src/task/raw.rs | 12 - tokio/src/task/stack.rs | 2 +- tokio/src/task/state.rs | 8 - tokio/src/task/tests/task.rs | 14 +- tokio/src/time/driver/mod.rs | 2 +- tokio/src/time/mod.rs | 2 + tokio/src/time/tests/mock_clock.rs | 2 +- tokio/src/util/mod.rs | 3 + tokio/src/util/try_lock.rs | 63 ++++ tokio/tests/io_driver.rs | 6 +- tokio/tests/io_driver_drop.rs | 12 +- tokio/tests/process_issue_42.rs | 6 +- tokio/tests/rt_basic.rs | 1 + tokio/tests/rt_common.rs | 160 ++++++++- tokio/tests/rt_threaded.rs | 13 +- tokio/tests/signal_drop_rt.rs | 1 + tokio/tests/signal_multi_rt.rs | 1 + tokio/tests/time_rt.rs | 6 +- 67 files changed, 1657 insertions(+), 1358 deletions(-) create mode 100644 tokio/src/park/either.rs rename tokio/src/{runtime => }/park/mod.rs (95%) create mode 100644 tokio/src/park/thread.rs delete mode 100644 tokio/src/runtime/blocking.rs create mode 100644 tokio/src/runtime/blocking/mod.rs rename tokio/src/{ => runtime}/blocking/pool.rs (54%) rename tokio/src/{ => runtime}/blocking/schedule.rs (100%) create mode 100644 tokio/src/runtime/blocking/shutdown.rs rename tokio/src/{ => runtime}/blocking/task.rs (100%) create mode 100644 tokio/src/runtime/park.rs delete mode 100644 tokio/src/runtime/park/thread.rs create mode 100644 tokio/src/runtime/spawner.rs delete mode 100644 tokio/src/runtime/tests/mock_park.rs delete mode 100644 tokio/src/runtime/thread_pool/tests/pool.rs create mode 100644 tokio/src/util/try_lock.rs diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 62c8d94ba8a..65c8fdee8d8 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -110,6 +110,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { fn #name(#inputs) #ret { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() .block_on(async { #body }) @@ -211,6 +212,7 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { fn #name() #ret { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() .block_on(async { #body }) diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index 5a2b74bf71c..e6a243a18a5 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. //! //! diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index bdd4a9f97c4..d70a0c22ffa 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -14,6 +14,7 @@ //! Tokio and Futures based testing utilites pub mod io; + mod macros; pub mod task; @@ -27,7 +28,11 @@ pub mod task; pub fn block_on(future: F) -> F::Output { use tokio::runtime; - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); rt.block_on(future) } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0619a50fe97..fdec4695624 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -99,7 +99,7 @@ fnv = { version = "1.0.6", optional = true } futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } -mio = { version = "0.6.14", optional = true } +mio = { version = "0.6.20", optional = true } num_cpus = { version = "1.8.0", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index c9e4e637e71..1ded892c93c 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! Asynchronous file and standard stream adaptation. //! //! This module contains utility methods and adapter types for input/output to @@ -96,6 +98,6 @@ mod sys { pub(crate) use std::fs::File; // TODO: don't rename - pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::runtime::spawn_blocking as run; pub(crate) use crate::task::JoinHandle as Blocking; } diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 4f47dc3895a..bb784541d41 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -4,7 +4,7 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::loom::sync::atomic::AtomicUsize; -use crate::runtime::{Park, Unpark}; +use crate::park::{Park, Unpark}; use crate::util::slab::{Address, Slab}; use mio::event::Evented; diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 86f2653b7ff..627e643fce1 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(loom, allow(dead_code, unreachable_pub))] + //! Asynchronous I/O. //! //! This module is the asynchronous version of `std::io`. Primarily, it @@ -204,7 +206,7 @@ cfg_io_blocking! { /// Types in this module can be mocked out in tests. mod sys { // TODO: don't rename - pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::runtime::spawn_blocking as run; pub(crate) use crate::task::JoinHandle as Blocking; } } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 88290f98a24..a0f1c194406 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -208,9 +208,6 @@ #[macro_use] mod macros; -// Blocking task implementation -pub(crate) mod blocking; - cfg_fs! { pub mod fs; } @@ -218,10 +215,10 @@ cfg_fs! { mod future; pub mod io; - pub mod net; mod loom; +mod park; pub mod prelude; diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 2c5b7eaa3e9..e6faa3b1692 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(feature = "full"), allow(unused_imports, dead_code))] +#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] mod atomic_u32; mod atomic_u64; @@ -45,8 +45,8 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::atomic_u64::AtomicU64; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::spin_loop_hint; pub(crate) use std::sync::atomic::{fence, AtomicPtr}; + pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; } } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 5e84a3ac192..a3146688d37 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -1,8 +1,12 @@ #![allow(unused_macros)] -macro_rules! cfg_atomic_waker { +macro_rules! cfg_resource_drivers { ($($item:item)*) => { - $( #[cfg(any(feature = "io-driver", feature = "time"))] $item )* + $( + #[cfg(any(feature = "io-driver", feature = "time"))] + #[cfg(not(loom))] + $item + )* } } diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs index 108557e4d36..aa66c5fa0a6 100644 --- a/tokio/src/net/addr.rs +++ b/tokio/src/net/addr.rs @@ -124,7 +124,7 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self) -> Self::Future { - use crate::blocking; + use crate::runtime::spawn_blocking; use sealed::MaybeReady; // First check if the input parses as a socket address @@ -137,7 +137,7 @@ cfg_dns! { // Run DNS lookup on the blocking pool let s = self.to_owned(); - MaybeReady::Blocking(blocking::spawn_blocking(move || { + MaybeReady::Blocking(spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&s) })) } @@ -152,7 +152,7 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self) -> Self::Future { - use crate::blocking; + use crate::runtime::spawn_blocking; use sealed::MaybeReady; let (host, port) = *self; @@ -174,7 +174,7 @@ cfg_dns! { let host = host.to_owned(); - MaybeReady::Blocking(blocking::spawn_blocking(move || { + MaybeReady::Blocking(spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) })) } diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs index f02b6259d38..ac913b21488 100644 --- a/tokio/src/net/mod.rs +++ b/tokio/src/net/mod.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! TCP/UDP/Unix bindings for `tokio`. //! //! This module contains the TCP/UDP/Unix networking types, similar to the standard diff --git a/tokio/src/park/either.rs b/tokio/src/park/either.rs new file mode 100644 index 00000000000..67f1e172744 --- /dev/null +++ b/tokio/src/park/either.rs @@ -0,0 +1,65 @@ +use crate::park::{Park, Unpark}; + +use std::fmt; +use std::time::Duration; + +pub(crate) enum Either { + A(A), + B(B), +} + +impl Park for Either +where + A: Park, + B: Park, +{ + type Unpark = Either; + type Error = Either; + + fn unpark(&self) -> Self::Unpark { + match self { + Either::A(a) => Either::A(a.unpark()), + Either::B(b) => Either::B(b.unpark()), + } + } + + fn park(&mut self) -> Result<(), Self::Error> { + match self { + Either::A(a) => a.park().map_err(Either::A), + Either::B(b) => b.park().map_err(Either::B), + } + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + match self { + Either::A(a) => a.park_timeout(duration).map_err(Either::A), + Either::B(b) => b.park_timeout(duration).map_err(Either::B), + } + } +} + +impl Unpark for Either +where + A: Unpark, + B: Unpark, +{ + fn unpark(&self) { + match self { + Either::A(a) => a.unpark(), + Either::B(b) => b.unpark(), + } + } +} + +impl fmt::Debug for Either +where + A: fmt::Debug, + B: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Either::A(a) => a.fmt(fmt), + Either::B(b) => b.fmt(fmt), + } + } +} diff --git a/tokio/src/runtime/park/mod.rs b/tokio/src/park/mod.rs similarity index 95% rename from tokio/src/runtime/park/mod.rs rename to tokio/src/park/mod.rs index bae96d9dd3b..e6b0c72bbcc 100644 --- a/tokio/src/runtime/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -44,12 +44,18 @@ //! [up]: trait.Unpark.html //! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html +cfg_resource_drivers! { + mod either; + pub(crate) use self::either::Either; +} + mod thread; -#[cfg(feature = "rt-threaded")] -pub(crate) use self::thread::CachedParkThread; -#[cfg(not(feature = "io-driver"))] pub(crate) use self::thread::ParkThread; +cfg_blocking_impl! { + pub(crate) use self::thread::CachedParkThread; +} + use std::sync::Arc; use std::time::Duration; @@ -58,7 +64,7 @@ use std::time::Duration; /// See [module documentation][mod] for more details. /// /// [mod]: ../index.html -pub trait Park { +pub(crate) trait Park { /// Unpark handle type for the `Park` implementation. type Unpark: Unpark; @@ -112,7 +118,7 @@ pub trait Park { /// /// [mod]: ../index.html /// [`Park`]: trait.Park.html -pub trait Unpark: Sync + Send + 'static { +pub(crate) trait Unpark: Sync + Send + 'static { /// Unblock a thread that is blocked by the associated `Park` handle. /// /// Calling `unpark` atomically makes available the unpark token, if it is diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs new file mode 100644 index 00000000000..dc844871e11 --- /dev/null +++ b/tokio/src/park/thread.rs @@ -0,0 +1,316 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::park::{Park, Unpark}; + +use std::sync::atomic::Ordering::SeqCst; +use std::time::Duration; + +#[derive(Debug)] +pub(crate) struct ParkThread { + inner: Arc, +} + +/// Error returned by [`ParkThread`] +/// +/// This currently is never returned, but might at some point in the future. +/// +/// [`ParkThread`]: struct.ParkThread.html +#[derive(Debug)] +pub(crate) struct ParkError { + _p: (), +} + +/// Unblocks a thread that was blocked by `ParkThread`. +#[derive(Clone, Debug)] +pub(crate) struct UnparkThread { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +thread_local! { + static CURRENT_PARKER: ParkThread = ParkThread::new(); +} + +// ==== impl ParkThread ==== + +impl ParkThread { + pub(crate) fn new() -> Self { + Self { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + }), + } + } +} + +impl Park for ParkThread { + type Unpark = UnparkThread; + type Error = ParkError; + + fn unpark(&self) -> Self::Unpark { + let inner = self.inner.clone(); + UnparkThread { inner } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration); + Ok(()) + } +} + +// ==== impl Inner ==== + +impl Inner { + /// Park the current thread for at most `dur`. + fn park(&self) { + // If we were previously notified then we consume this notification and + // return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // Otherwise we need to coordinate going to sleep + let mut m = self.mutex.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + loop { + m = self.condvar.wait(m).unwrap(); + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // spurious wakeup, go back to sleep + } + } + + fn park_timeout(&self, dur: Duration) { + // Like `park` above we have a fast path for an already-notified thread, + // and afterwards we start coordinating for a sleep. return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + let m = self.mutex.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read again here, see `park`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park_timeout state; actual = {}", actual), + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification, we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap(); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before + // this call, we must perform a release operation that `park` can + // synchronize with. To do that we must write `NOTIFIED` even if `state` + // is already `NOTIFIED`. That is why this must be a swap rather than a + // compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.mutex.lock().unwrap()); + + self.condvar.notify_one() + } +} + +impl Default for ParkThread { + fn default() -> Self { + Self::new() + } +} + +// ===== impl UnparkThread ===== + +impl Unpark for UnparkThread { + fn unpark(&self) { + self.inner.unpark(); + } +} + +cfg_blocking_impl! { + use std::marker::PhantomData; + use std::rc::Rc; + + use std::mem; + use std::task::{RawWaker, RawWakerVTable, Waker}; + + /// Blocks the current thread using a condition variable. + #[derive(Debug)] + pub(crate) struct CachedParkThread { + _anchor: PhantomData>, + } + + impl CachedParkThread { + /// Create a new `ParkThread` handle for the current thread. + /// + /// This type cannot be moved to other threads, so it should be created on + /// the thread that the caller intends to park. + pub(crate) fn new() -> CachedParkThread { + CachedParkThread { + _anchor: PhantomData, + } + } + + /// Get a reference to the `ParkThread` handle for this thread. + fn with_current(&self, f: F) -> R + where + F: FnOnce(&ParkThread) -> R, + { + CURRENT_PARKER.with(|inner| f(inner)) + } + } + + impl Park for CachedParkThread { + type Unpark = UnparkThread; + type Error = ParkError; + + fn unpark(&self) -> Self::Unpark { + self.with_current(|park_thread| park_thread.unpark()) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park()); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park_timeout(duration)); + Ok(()) + } + } + + + impl UnparkThread { + pub(crate) fn into_waker(self) -> Waker { + unsafe { + let raw = unparker_to_raw_waker(self.inner); + Waker::from_raw(raw) + } + } + } + + impl Inner { + #[allow(clippy::wrong_self_convention)] + fn into_raw(this: Arc) -> *const () { + Arc::into_raw(this) as *const () + } + + unsafe fn from_raw(ptr: *const ()) -> Arc { + Arc::from_raw(ptr as *const Inner) + } + } + + unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { + RawWaker::new( + Inner::into_raw(unparker), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), + ) + } + + unsafe fn clone(raw: *const ()) -> RawWaker { + let unparker = Inner::from_raw(raw); + + // Increment the ref count + mem::forget(unparker.clone()); + + unparker_to_raw_waker(unparker) + } + + unsafe fn drop_waker(raw: *const ()) { + let _ = Inner::from_raw(raw); + } + + unsafe fn wake(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); + } + + unsafe fn wake_by_ref(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); + + // We don't actually own a reference to the unparker + mem::forget(unparker); + } +} diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs index 6e62c519d28..7e482892295 100644 --- a/tokio/src/prelude.rs +++ b/tokio/src/prelude.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! A "prelude" for users of the `tokio` crate. //! //! This prelude is similar to the standard library's prelude in that you'll diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index affe2a5e66d..d5cf7c761d9 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,4 +1,4 @@ -use crate::runtime::park::{Park, Unpark}; +use crate::park::{Park, Unpark}; use crate::task::{self, JoinHandle, Schedule, Task}; use std::cell::UnsafeCell; @@ -245,7 +245,7 @@ impl SchedulerPriv { } unsafe fn schedule_local(&self, task: Task) { - (*self.local_queue.get()).push_front(task); + (*self.local_queue.get()).push_back(task); } fn next_task(&self, tick: u8) -> Option> { diff --git a/tokio/src/runtime/blocking.rs b/tokio/src/runtime/blocking.rs deleted file mode 100644 index 8408e78b7ee..00000000000 --- a/tokio/src/runtime/blocking.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking -//! pool. When the `blocking` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -cfg_blocking_impl! { - pub(crate) use crate::blocking::BlockingPool; - pub(crate) use crate::blocking::Spawner; - - use crate::runtime::Builder; - - pub(crate) fn create_blocking_pool(builder: &Builder) -> BlockingPool { - BlockingPool::new(builder.thread_name.clone(), builder.thread_stack_size) - } -} - -cfg_not_blocking_impl! { - use crate::runtime::Builder; - - #[derive(Debug, Clone)] - pub(crate) struct BlockingPool {} - - pub(crate) use BlockingPool as Spawner; - - pub(crate) fn create_blocking_pool(_builder: &Builder) -> BlockingPool { - BlockingPool {} - } - - impl BlockingPool { - pub(crate) fn spawner(&self) -> &BlockingPool { - self - } - - pub(crate) fn enter(&self, f: F) -> R - where - F: FnOnce() -> R, - { - f() - } - } -} diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs new file mode 100644 index 00000000000..63c54b7417f --- /dev/null +++ b/tokio/src/runtime/blocking/mod.rs @@ -0,0 +1,63 @@ +//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking +//! pool. When the `blocking` feature flag is **not** enabled, these APIs are +//! shells. This isolates the complexity of dealing with conditional +//! compilation. + +cfg_blocking_impl! { + mod pool; + pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; + + mod schedule; + mod shutdown; + mod task; + + use crate::runtime::{self, Builder, io, time}; + + pub(crate) fn create_blocking_pool( + builder: &Builder, + spawner: &runtime::Spawner, + io: &io::Handle, + time: &time::Handle, + clock: &time::Clock, + ) -> BlockingPool { + BlockingPool::new( + builder, + spawner, + io, + time, + clock) + + } +} + +cfg_not_blocking_impl! { + use crate::runtime::{self, io, time, Builder}; + + #[derive(Debug, Clone)] + pub(crate) struct BlockingPool {} + + pub(crate) use BlockingPool as Spawner; + + pub(crate) fn create_blocking_pool( + _builder: &Builder, + _spawner: &runtime::Spawner, + _io: &io::Handle, + _time: &time::Handle, + _clock: &time::Clock, + ) -> BlockingPool { + BlockingPool {} + } + + impl BlockingPool { + pub(crate) fn spawner(&self) -> &BlockingPool { + self + } + + pub(crate) fn enter(&self, f: F) -> R + where + F: FnOnce() -> R, + { + f() + } + } +} diff --git a/tokio/src/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs similarity index 54% rename from tokio/src/blocking/pool.rs rename to tokio/src/runtime/blocking/pool.rs index f75af78023c..052d361ad55 100644 --- a/tokio/src/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -1,9 +1,11 @@ //! Thread pool for blocking operations -use crate::blocking::schedule::NoopSchedule; -use crate::blocking::task::BlockingTask; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; +use crate::runtime::{self, io, time, Builder, Callback}; +use crate::runtime::blocking::shutdown; +use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::task::BlockingTask; use crate::task::{self, JoinHandle}; use std::cell::Cell; @@ -13,6 +15,7 @@ use std::time::Duration; pub(crate) struct BlockingPool { spawner: Spawner, + shutdown_rx: shutdown::Receiver, } #[derive(Clone)] @@ -32,6 +35,25 @@ struct Inner { /// Spawned thread stack size stack_size: Option, + + /// Call after a thread starts + after_start: Option, + + /// Call before a thread stops + before_stop: Option, + + /// Spawns async tasks + spawner: runtime::Spawner, + + /// Runtime I/O driver handle + io_handle: io::Handle, + + /// Runtime time driver handle + time_handle: time::Handle, + + /// Source of `Instant::now()` + clock: time::Clock, + } struct Shared { @@ -40,6 +62,7 @@ struct Shared { num_idle: u32, num_notify: u32, shutdown: bool, + shutdown_tx: Option, } type Task = task::Task; @@ -72,7 +95,15 @@ where // ===== impl BlockingPool ===== impl BlockingPool { - pub(crate) fn new(thread_name: String, stack_size: Option) -> BlockingPool { + pub(crate) fn new( + builder: &Builder, + spawner: &runtime::Spawner, + io: &io::Handle, + time: &time::Handle, + clock: &time::Clock, + ) -> BlockingPool { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + BlockingPool { spawner: Spawner { inner: Arc::new(Inner { @@ -82,12 +113,20 @@ impl BlockingPool { num_idle: 0, num_notify: 0, shutdown: false, + shutdown_tx: Some(shutdown_tx), }), condvar: Condvar::new(), - thread_name, - stack_size, + thread_name: builder.thread_name.clone(), + stack_size: builder.thread_stack_size, + after_start: builder.after_start.clone(), + before_stop: builder.before_stop.clone(), + spawner: spawner.clone(), + io_handle: io.clone(), + time_handle: time.clone(), + clock: clock.clone(), }), }, + shutdown_rx, } } @@ -99,12 +138,14 @@ impl BlockingPool { impl Drop for BlockingPool { fn drop(&mut self) { let mut shared = self.spawner.inner.shared.lock().unwrap(); + shared.shutdown = true; + shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); - while shared.num_th > 0 { - shared = self.spawner.inner.condvar.wait(shared).unwrap(); - } + drop(shared); + + self.shutdown_rx.wait(); } } @@ -116,19 +157,6 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== - -cfg_rt_threaded! { - impl Spawner { - pub(crate) fn spawn_background(&self, func: F) - where - F: FnOnce() + Send + 'static, - { - let task = task::background(BlockingTask::new(func)); - self.schedule(task); - } - } -} - impl Spawner { /// Set the blocking pool for the duration of the closure /// @@ -165,7 +193,7 @@ impl Spawner { } fn schedule(&self, task: Task) { - let should_spawn_thread = { + let shutdown_tx = { let mut shared = self.inner.shared.lock().unwrap(); if shared.shutdown { @@ -180,10 +208,11 @@ impl Spawner { if shared.num_th == MAX_THREADS { // At max number of threads - false + None } else { shared.num_th += 1; - true + assert!(shared.shutdown_tx.is_some()); + shared.shutdown_tx.clone() } } else { // Notify an idle worker thread. The notification counter @@ -194,16 +223,16 @@ impl Spawner { shared.num_idle -= 1; shared.num_notify += 1; self.inner.condvar.notify_one(); - false + None } }; - if should_spawn_thread { - self.spawn_thread(); + if let Some(shutdown_tx) = shutdown_tx { + self.spawn_thread(shutdown_tx); } } - fn spawn_thread(&self) { + fn spawn_thread(&self, shutdown_tx: shutdown::Sender) { let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); if let Some(stack_size) = self.inner.stack_size { @@ -214,71 +243,101 @@ impl Spawner { builder .spawn(move || { - let mut shared = inner.shared.lock().unwrap(); - - 'main: loop { - // BUSY - while let Some(task) = shared.queue.pop_front() { - drop(shared); - run_task(task); - - shared = inner.shared.lock().unwrap(); - if shared.shutdown { - break; // Need to increment idle before we exit - } - } - - // IDLE - shared.num_idle += 1; - - while !shared.shutdown { - let lock_result = inner.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); - - shared = lock_result.0; - let timeout_result = lock_result.1; - - if shared.num_notify != 0 { - // We have received a legitimate wakeup, - // acknowledge it by decrementing the counter - // and transition to the BUSY state. - shared.num_notify -= 1; - break; - } - - if timeout_result.timed_out() { - break 'main; - } - - // Spurious wakeup detected, go back to sleep. - } - - if shared.shutdown { - // Work was produced, and we "took" it (by decrementing num_notify). - // This means that num_idle was decremented once for our wakeup. - // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; - // NOTE: Technically we should also do num_notify++ and notify again, - // but since we're shutting down anyway, that won't be necessary. - break; - } + inner.run(); + + // Make sure `inner` drops first to ensure that the shutdown_rx + // sees all refs to `Inner` are dropped when the `shutdown_rx` + // resolves. + drop(inner); + drop(shutdown_tx); + }) + .unwrap(); + } +} + +impl Inner { + fn run(&self) { + let _io = io::set_default(&self.io_handle); + + time::with_default(&self.time_handle, &self.clock, || { + self.spawner.enter(|| self.run2()); + }); + } + + fn run2(&self) { + if let Some(f) = &self.after_start { + f() + } + + let mut shared = self.shared.lock().unwrap(); + + 'main: loop { + // BUSY + while let Some(task) = shared.queue.pop_front() { + drop(shared); + run_task(task); + + shared = self.shared.lock().unwrap(); + if shared.shutdown { + break; // Need to increment idle before we exit } + } + + // IDLE + shared.num_idle += 1; - // Thread exit - shared.num_th -= 1; + while !shared.shutdown { + let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); - // num_idle should now be tracked exactly, panic - // with a descriptive message if it is not the - // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); + shared = lock_result.0; + let timeout_result = lock_result.1; - if shared.shutdown && shared.num_th == 0 { - inner.condvar.notify_one(); + if shared.num_notify != 0 { + // We have received a legitimate wakeup, + // acknowledge it by decrementing the counter + // and transition to the BUSY state. + shared.num_notify -= 1; + break; } - }) - .unwrap(); + + if timeout_result.timed_out() { + break 'main; + } + + // Spurious wakeup detected, go back to sleep. + } + + if shared.shutdown { + // Work was produced, and we "took" it (by decrementing num_notify). + // This means that num_idle was decremented once for our wakeup. + // But, since we are exiting, we need to "undo" that, as we'll stay idle. + shared.num_idle += 1; + // NOTE: Technically we should also do num_notify++ and notify again, + // but since we're shutting down anyway, that won't be necessary. + break; + } + } + + // Thread exit + shared.num_th -= 1; + + // num_idle should now be tracked exactly, panic + // with a descriptive message if it is not the + // case. + shared.num_idle = shared + .num_idle + .checked_sub(1) + .expect("num_idle underflowed on thread exit"); + + if shared.shutdown && shared.num_th == 0 { + self.condvar.notify_one(); + } + + drop(shared); + + if let Some(f) = &self.before_stop { + f() + } } } diff --git a/tokio/src/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs similarity index 100% rename from tokio/src/blocking/schedule.rs rename to tokio/src/runtime/blocking/schedule.rs diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs new file mode 100644 index 00000000000..d9f5eb0fc69 --- /dev/null +++ b/tokio/src/runtime/blocking/shutdown.rs @@ -0,0 +1,44 @@ +//! A shutdown channel. +//! +//! Each worker holds the `Sender` half. When all the `Sender` halves are +//! dropped, the `Receiver` receives a notification. + +use crate::loom::sync::Arc; +use crate::sync::oneshot; + +#[derive(Debug, Clone)] +pub(super) struct Sender { + tx: Arc>, +} + +#[derive(Debug)] +pub(super) struct Receiver { + rx: oneshot::Receiver<()>, +} + +pub(super) fn channel() -> (Sender, Receiver) { + let (tx, rx) = oneshot::channel(); + let tx = Sender { tx: Arc::new(tx) }; + let rx = Receiver { rx }; + + (tx, rx) +} + +impl Receiver { + /// Block the current thread until all `Sender` handles drop. + pub(crate) fn wait(&mut self) { + use crate::runtime::enter::{enter, try_enter}; + + let mut e = if std::thread::panicking() { + match try_enter() { + Some(enter) => enter, + _ => return, + } + } else { + enter() + }; + + // The oneshot completes with an Err + let _ = e.block_on(&mut self.rx); + } +} diff --git a/tokio/src/blocking/task.rs b/tokio/src/runtime/blocking/task.rs similarity index 100% rename from tokio/src/blocking/task.rs rename to tokio/src/runtime/blocking/task.rs diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 793146f700c..4f36a027b30 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,9 +1,10 @@ -use crate::loom::sync::Arc; -use crate::runtime::handle::{self, Handle}; +use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; -use crate::runtime::{blocking, io, time, Runtime}; +use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; use std::fmt; +#[cfg(not(loom))] +use std::sync::Arc; /// Builds Tokio Runtime with custom configuration values. /// @@ -39,6 +40,12 @@ pub struct Builder { /// The task execution model to use. kind: Kind, + /// Whether or not to enable the I/O driver + enable_io: bool, + + /// Whether or not to enable the time driver + enable_time: bool, + /// The number of worker threads. /// /// Only used when not using the current-thread executor. @@ -51,13 +58,13 @@ pub struct Builder { pub(super) thread_stack_size: Option, /// Callback to run after each thread starts. - after_start: Option, + pub(super) after_start: Option, /// To run before each worker thread stops - before_stop: Option, + pub(super) before_stop: Option, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum Kind { Shell, #[cfg(feature = "rt-core")] @@ -66,12 +73,6 @@ enum Kind { ThreadPool, } -#[cfg(not(loom))] -type Callback = Arc; - -#[cfg(loom)] -type Callback = Arc>; - impl Builder { /// Returns a new runtime builder initialized with default configuration /// values. @@ -82,6 +83,12 @@ impl Builder { // No task execution by default kind: Kind::Shell, + // I/O defaults to "off" + enable_io: false, + + // Time defaults to "off" + enable_time: false, + // Default to use an equal number of threads to number of CPU cores num_threads: crate::loom::sys::num_cpus(), @@ -97,6 +104,31 @@ impl Builder { } } + /// Enable both I/O and time drivers. + /// + /// Doing this is a shorthand for calling `enable_io` and `enable_time` + /// individually. If additional components are added to Tokio in the future, + /// `enable_all` will include these future components. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_all() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_all(&mut self) -> &mut Self { + #[cfg(feature = "io-driver")] + self.enable_io(); + #[cfg(feature = "time")] + self.enable_time(); + + self + } + /// Set the maximum number of worker threads for the `Runtime`'s thread pool. /// /// This must be a number between 1 and 32,768 though it is advised to keep @@ -107,14 +139,12 @@ impl Builder { /// # Examples /// /// ``` - /// # use tokio::runtime; + /// use tokio::runtime; /// - /// # pub fn main() { /// let rt = runtime::Builder::new() /// .num_threads(4) /// .build() /// .unwrap(); - /// # } /// ``` pub fn num_threads(&mut self, val: usize) -> &mut Self { self.num_threads = val; @@ -194,14 +224,14 @@ impl Builder { /// /// # pub fn main() { /// let runtime = runtime::Builder::new() - /// .after_start(|| { + /// .on_thread_start(|| { /// println!("thread started"); /// }) /// .build(); /// # } /// ``` #[cfg(not(loom))] - pub fn after_start(&mut self, f: F) -> &mut Self + pub fn on_thread_start(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, { @@ -220,14 +250,14 @@ impl Builder { /// /// # pub fn main() { /// let runtime = runtime::Builder::new() - /// .before_stop(|| { + /// .on_thread_stop(|| { /// println!("thread stopping"); /// }) /// .build(); /// # } /// ``` #[cfg(not(loom))] - pub fn before_stop(&mut self, f: F) -> &mut Self + pub fn on_thread_stop(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, { @@ -266,21 +296,21 @@ impl Builder { let clock = time::create_clock(); // Create I/O driver - let (io_driver, handle) = io::create_driver()?; - let io_handles = vec![handle]; + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); - let (driver, handle) = time::create_driver(io_driver, clock.clone()); - let time_handles = vec![handle]; + let spawner = Spawner::Shell; - let blocking_pool = blocking::create_blocking_pool(self); + let blocking_pool = + blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { kind: Kind::Shell(Shell::new(driver)), handle: Handle { - kind: handle::Kind::Shell, - io_handles, - time_handles, + spawner, + io_handle, + time_handle, clock, blocking_spawner, }, @@ -289,6 +319,53 @@ impl Builder { } } +cfg_io_driver! { + impl Builder { + /// Enable the I/O driver. + /// + /// Doing this enables using net, process, signal, and some I/O types on + /// the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_io() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_io(&mut self) -> &mut Self { + self.enable_io = true; + self + } + } +} + +cfg_time! { + impl Builder { + /// Enable the time driver. + /// + /// Doing this enables using `tokio::time` on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_time() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_time(&mut self) -> &mut Self { + self.enable_time = true; + self + } + } +} + cfg_rt_core! { impl Builder { fn build_basic_runtime(&mut self) -> io::Result { @@ -297,29 +374,27 @@ cfg_rt_core! { let clock = time::create_clock(); // Create I/O driver - let (io_driver, handle) = io::create_driver()?; - let io_handles = vec![handle]; + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, handle) = time::create_driver(io_driver, clock.clone()); - let time_handles = vec![handle]; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. let scheduler = BasicScheduler::new(driver); - let spawner = scheduler.spawner(); + let spawner = Spawner::Basic(scheduler.spawner()); // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self); + let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { kind: Kind::Basic(scheduler), handle: Handle { - kind: handle::Kind::Basic(spawner), - io_handles, - time_handles, + spawner, + io_handle, + time_handle, clock, blocking_spawner, }, @@ -333,77 +408,30 @@ cfg_rt_threaded! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::runtime::{Kind, ThreadPool}; - use std::sync::Mutex; + use crate::runtime::park::Parker; let clock = time::create_clock(); - let mut io_handles = Vec::new(); - let mut time_handles = Vec::new(); - let mut drivers = Vec::new(); - - for _ in 0..self.num_threads { - // Create I/O driver and handle - let (io_driver, handle) = io::create_driver()?; - io_handles.push(handle); - - // Create a new timer. - let (time_driver, handle) = time::create_driver(io_driver, clock.clone()); - time_handles.push(handle); - drivers.push(Mutex::new(Some(time_driver))); - } + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (scheduler, workers) = ThreadPool::new(self.num_threads, Parker::new(driver)); + let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self); + let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock); let blocking_spawner = blocking_pool.spawner().clone(); - let scheduler = { - let clock = clock.clone(); - let io_handles = io_handles.clone(); - let time_handles = time_handles.clone(); - - let after_start = self.after_start.clone(); - let before_stop = self.before_stop.clone(); - - let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| { - // Configure the I/O driver - let _io = io::set_default(&io_handles[index]); - - // Configure time - time::with_default(&time_handles[index], &clock, || { - // Call the start callback - if let Some(after_start) = after_start.as_ref() { - after_start(); - } - - // Run the worker - next(); - - // Call the after call back - if let Some(before_stop) = before_stop.as_ref() { - before_stop(); - } - }) - }) - as Box); - - ThreadPool::new( - self.num_threads, - blocking_pool.spawner().clone(), - around_worker, - move |index| drivers[index].lock().unwrap().take().unwrap(), - ) - }; - - let spawner = scheduler.spawner().clone(); + // Spawn the thread pool workers + workers.spawn(&blocking_spawner); Ok(Runtime { kind: Kind::ThreadPool(scheduler), handle: Handle { - kind: handle::Kind::ThreadPool(spawner), - io_handles, - time_handles, + spawner, + io_handle, + time_handle, clock, - blocking_spawner, + blocking_spawner: blocking_spawner.clone(), }, blocking_pool, }) diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index a0925cb494d..1995da63c27 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -46,36 +46,36 @@ pub(crate) fn try_enter() -> Option { // // This is hidden for a reason. Do not use without fully understanding // executors. Misuing can easily cause your program to deadlock. -cfg_rt_threaded! { - #[cfg(feature = "blocking")] - pub(crate) fn exit R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset; - impl Drop for Reset { - fn drop(&mut self) { - ENTERED.with(|c| { - c.set(true); - }); - } +#[cfg(all(feature = "rt-threaded", feature = "blocking"))] +pub(crate) fn exit R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset; + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + c.set(true); + }); } + } - ENTERED.with(|c| { - debug_assert!(c.get()); - c.set(false); - }); + ENTERED.with(|c| { + debug_assert!(c.get()); + c.set(false); + }); - let reset = Reset; - let ret = f(); - ::std::mem::forget(reset); + let reset = Reset; + let ret = f(); + ::std::mem::forget(reset); - ENTERED.with(|c| { - assert!(!c.get(), "closure claimed permanent executor"); - c.set(true); - }); + ENTERED.with(|c| { + assert!(!c.get(), "closure claimed permanent executor"); + c.set(true); + }); - ret - } + ret +} +cfg_blocking_impl! { impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. @@ -83,7 +83,7 @@ cfg_rt_threaded! { where F: std::future::Future, { - use crate::runtime::park::{CachedParkThread, Park}; + use crate::park::{CachedParkThread, Park}; use std::pin::Pin; use std::task::Context; use std::task::Poll::Ready; diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index f8119e65577..562a33cee56 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,42 +1,29 @@ -use crate::runtime::{blocking, io, time}; +use crate::runtime::{blocking, io, time, Spawner}; cfg_rt_core! { - use crate::runtime::basic_scheduler; use crate::task::JoinHandle; use std::future::Future; } -cfg_rt_threaded! { - use crate::runtime::thread_pool; -} - /// Handle to the runtime #[derive(Debug, Clone)] pub struct Handle { - pub(super) kind: Kind, + pub(super) spawner: Spawner, /// Handles to the I/O drivers - pub(super) io_handles: Vec, + pub(super) io_handle: io::Handle, /// Handles to the time drivers - pub(super) time_handles: Vec, + pub(super) time_handle: time::Handle, + /// Source of `Instant::now()` pub(super) clock: time::Clock, /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, } -#[derive(Debug, Clone)] -pub(super) enum Kind { - Shell, - #[cfg(feature = "rt-core")] - Basic(basic_scheduler::Spawner), - #[cfg(feature = "rt-threaded")] - ThreadPool(thread_pool::Spawner), -} - impl Handle { /// Enter the runtime context pub fn enter(&self, f: F) -> R @@ -44,15 +31,9 @@ impl Handle { F: FnOnce() -> R, { self.blocking_spawner.enter(|| { - let _io = io::set_default(&self.io_handles[0]); + let _io = io::set_default(&self.io_handle); - time::with_default(&self.time_handles[0], &self.clock, || match &self.kind { - Kind::Shell => f(), - #[cfg(feature = "rt-core")] - Kind::Basic(spawner) => spawner.enter(f), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool(spawner) => spawner.enter(f), - }) + time::with_default(&self.time_handle, &self.clock, || self.spawner.enter(f)) }) } } @@ -95,13 +76,7 @@ cfg_rt_core! { F: Future + Send + 'static, F::Output: Send + 'static, { - match &self.kind { - Kind::Shell => panic!("spawning not enabled for runtime"), - #[cfg(feature = "rt-core")] - Kind::Basic(spawner) => spawner.spawn(future), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool(spawner) => spawner.spawn(future), - } + self.spawner.spawn(future) } } } diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs index 014d39de259..e912f6f7e46 100644 --- a/tokio/src/runtime/io.rs +++ b/tokio/src/runtime/io.rs @@ -6,8 +6,12 @@ /// Re-exported for convenience. pub(crate) use std::io::Result; -cfg_io_driver! { +pub(crate) use variant::*; + +#[cfg(all(feature = "io-driver", not(loom)))] +mod variant { use crate::io::driver; + use crate::park::{Either, ParkThread}; use std::io; @@ -16,27 +20,33 @@ cfg_io_driver! { /// When the `io-driver` feature is enabled, this is the "real" I/O driver /// backed by Mio. Without the `io-driver` feature, this is a thread parker /// backed by a condition variable. - pub(crate) type Driver = driver::Driver; + pub(crate) type Driver = Either; /// The handle the runtime stores for future use. /// /// When the `io-driver` feature is **not** enabled, this is `()`. - pub(crate) type Handle = driver::Handle; + pub(crate) type Handle = Option; - pub(crate) fn create_driver() -> io::Result<(Driver, Handle)> { - let driver = driver::Driver::new()?; - let handle = driver.handle(); + pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { + if enable { + let driver = driver::Driver::new()?; + let handle = driver.handle(); - Ok((driver, handle)) + Ok((Either::A(driver), Some(handle))) + } else { + let driver = ParkThread::new(); + Ok((Either::B(driver), None)) + } } - pub(crate) fn set_default(handle: &Handle) -> driver::DefaultGuard<'_> { - driver::set_default(handle) + pub(crate) fn set_default(handle: &Handle) -> Option> { + handle.as_ref().map(|handle| driver::set_default(handle)) } } -cfg_not_io_driver! { - use crate::runtime::park::ParkThread; +#[cfg(any(not(feature = "io-driver"), loom))] +mod variant { + use crate::park::ParkThread; use std::io; @@ -46,7 +56,7 @@ cfg_not_io_driver! { /// There is no handle pub(crate) type Handle = (); - pub(crate) fn create_driver() -> io::Result<(Driver, Handle)> { + pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> { let driver = ParkThread::new(); Ok((driver, ())) diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index eef4872e95f..7225ea8d393 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -153,6 +153,14 @@ //! Most applications should use the threaded scheduler, except in some niche //! use-cases, such as when running only a single thread is required. //! +//! #### Resource drivers +//! +//! When configuring a runtime by hand, no resource drivers are enabled by +//! default. In this case, attempting to use networking types or time types will +//! fail. In order to enable these types, the resource drivers must be enabled. +//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a +//! shorthand, [`Builder::enable_all`] enables both resource drivers. +//! //! [tasks]: crate::task //! [driver]: crate::io::driver //! [executor]: https://tokio.rs/docs/internals/runtime-model/#executors @@ -165,6 +173,9 @@ //! [`Runtime::new`]: crate::runtime::Runtime::new //! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler //! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler +//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io +//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time +//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all // At the top due to macros #[cfg(test)] @@ -179,6 +190,10 @@ cfg_rt_core! { mod blocking; use blocking::BlockingPool; +cfg_blocking_impl! { + pub(crate) use blocking::spawn_blocking; +} + mod builder; pub use self::builder::Builder; @@ -195,12 +210,17 @@ pub use self::handle::Handle; mod io; -mod park; -pub use self::park::{Park, Unpark}; +cfg_rt_threaded! { + mod park; + use park::{Parker, Unparker}; +} mod shell; use self::shell::Shell; +mod spawner; +use self::spawner::Spawner; + mod time; cfg_rt_threaded! { @@ -271,6 +291,9 @@ enum Kind { ThreadPool(ThreadPool), } +/// After thread starts / before thread stops +type Callback = ::std::sync::Arc; + impl Runtime { /// Create a new runtime instance with default configuration values. /// @@ -309,13 +332,13 @@ impl Runtime { /// [runtime builder]: crate::runtime::Builder pub fn new() -> io::Result { #[cfg(feature = "rt-threaded")] - let ret = Builder::new().threaded_scheduler().build(); + let ret = Builder::new().threaded_scheduler().enable_all().build(); #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))] - let ret = Builder::new().basic_scheduler().build(); + let ret = Builder::new().basic_scheduler().enable_all().build(); #[cfg(not(feature = "rt-core"))] - let ret = Builder::new().build(); + let ret = Builder::new().enable_all().build(); ret } diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs new file mode 100644 index 00000000000..7681e20fe4a --- /dev/null +++ b/tokio/src/runtime/park.rs @@ -0,0 +1,225 @@ +//! Parks the runtime. +//! +//! A combination of the various resource driver park handles. + +use crate::loom::sync::{Arc, Mutex, Condvar}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::park::{Park, Unpark}; +use crate::runtime::time; +use crate::util::TryLock; + +use std::sync::atomic::Ordering::SeqCst; +use std::time::Duration; + +pub(crate) struct Parker { + inner: Arc, +} + +pub(crate) struct Unparker { + inner: Arc, +} + +struct Inner { + /// Avoids entering the park if possible + state: AtomicUsize, + + /// Used to coordinate access to the driver / condvar + mutex: Mutex<()>, + + /// Condvar to block on if the driver is unavailable. + condvar: Condvar, + + /// Resource (I/O, time, ...) driver + shared: Arc, +} + +const EMPTY: usize = 0; +const PARKED_CONDVAR: usize = 1; +const PARKED_DRIVER: usize = 2; +const NOTIFIED: usize = 3; + +/// Shared across multiple Parker handles +struct Shared { + /// Shared driver. Only one thread at a time can use this + driver: TryLock, + + /// Unpark handle + handle: ::Unpark, +} + +impl Parker { + pub(crate) fn new(driver: time::Driver) -> Parker { + let handle = driver.unpark(); + + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: Arc::new(Shared { + driver: TryLock::new(driver), + handle, + }), + }), + } + } +} + +impl Clone for Parker { + fn clone(&self) -> Parker { + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: self.inner.shared.clone(), + }), + } + } +} + +impl Park for Parker { + type Unpark = Unparker; + type Error = (); + + fn unpark(&self) -> Unparker { + Unparker { inner: self.inner.clone() } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + // Only parking with zero is supported... + assert_eq!(duration, Duration::from_millis(0)); + + if let Some(mut driver) = self.inner.shared.driver.try_lock() { + driver.park_timeout(duration) + .map_err(|_| ()) + } else { + Ok(()) + } + } +} + +impl Unpark for Unparker { + fn unpark(&self) { + self.inner.unpark(); + } +} + +impl Inner { + /// Park the current thread for at most `dur`. + fn park(&self) { + // If we were previously notified then we consume this notification and + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + if let Some(mut driver) = self.shared.driver.try_lock() { + self.park_driver(&mut driver); + } else { + self.park_condvar(); + } + } + + fn park_condvar(&self) { + // Otherwise we need to coordinate going to sleep + let mut m = self.mutex.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + loop { + m = self.condvar.wait(m).unwrap(); + + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + // got a notification + return; + } + + // spurious wakeup, go back to sleep + } + } + + fn park_driver(&self, driver: &mut time::Driver) { + match self.state.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + // TODO: don't unwrap + driver.park().unwrap(); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED_DRIVER => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before + // this call, we must perform a release operation that `park` can + // synchronize with. To do that we must write `NOTIFIED` even if `state` + // is already `NOTIFIED`. That is why this must be a swap rather than a + // compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => {}, // no one was waiting + NOTIFIED => {}, // already unparked + PARKED_CONDVAR => self.unpark_condvar(), + PARKED_DRIVER => self.unpark_driver(), + actual => panic!("inconsistent state in unpark; actual = {}", actual), + } + } + + fn unpark_condvar(&self) { + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.mutex.lock().unwrap()); + + self.condvar.notify_one() + } + + fn unpark_driver(&self) { + self.shared.handle.unpark(); + } +} diff --git a/tokio/src/runtime/park/thread.rs b/tokio/src/runtime/park/thread.rs deleted file mode 100644 index 71bd5b92af5..00000000000 --- a/tokio/src/runtime/park/thread.rs +++ /dev/null @@ -1,280 +0,0 @@ -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::{Arc, Condvar, Mutex}; -use crate::runtime::park::{Park, Unpark}; - -use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::atomic::Ordering; -use std::time::Duration; - -/// Blocks the current thread using a condition variable. -/// -/// Implements the [`Park`] functionality by using a condition variable. An -/// atomic variable is also used to avoid using the condition variable if -/// possible. -/// -/// The condition variable is cached in a thread-local variable and is shared -/// across all `ParkThread` instances created on the same thread. This also -/// means that an instance of `ParkThread` might be unblocked by a handle -/// associated with a different `ParkThread` instance. -#[derive(Debug)] -pub(crate) struct CachedParkThread { - _anchor: PhantomData>, -} - -#[derive(Debug)] -pub(crate) struct ParkThread { - inner: Arc, -} - -/// Error returned by [`ParkThread`] -/// -/// This currently is never returned, but might at some point in the future. -/// -/// [`ParkThread`]: struct.ParkThread.html -#[derive(Debug)] -pub(crate) struct ParkError { - _p: (), -} - -/// Unblocks a thread that was blocked by `ParkThread`. -#[derive(Clone, Debug)] -pub(crate) struct UnparkThread { - inner: Arc, -} - -#[derive(Debug)] -struct Inner { - state: AtomicUsize, - mutex: Mutex<()>, - condvar: Condvar, -} - -const IDLE: usize = 0; -const NOTIFY: usize = 1; -const SLEEP: usize = 2; - -thread_local! { - static CURRENT_PARKER: ParkThread = ParkThread::new(); -} - -// ==== impl ParkThread ==== - -impl ParkThread { - pub(crate) fn new() -> Self { - Self { - inner: Arc::new(Inner { - state: AtomicUsize::new(IDLE), - mutex: Mutex::new(()), - condvar: Condvar::new(), - }), - } - } -} - -impl Park for ParkThread { - type Unpark = UnparkThread; - type Error = ParkError; - - fn unpark(&self) -> Self::Unpark { - let inner = self.inner.clone(); - UnparkThread { inner } - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park(None) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.inner.park(Some(duration)) - } -} - -// ==== impl Inner ==== - -impl Inner { - /// Park the current thread for at most `dur`. - fn park(&self, timeout: Option) -> Result<(), ParkError> { - // If currently notified, then we skip sleeping. This is checked outside - // of the lock to avoid acquiring a mutex if not necessary. - match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { - NOTIFY => return Ok(()), - IDLE => {} - _ => unreachable!(), - } - - // The state is currently idle, so obtain the lock and then try to - // transition to a sleeping state. - let mut m = self.mutex.lock().unwrap(); - - // Transition to sleeping - match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { - NOTIFY => { - // Notified before we could sleep, consume the notification and - // exit - self.state.store(IDLE, Ordering::SeqCst); - return Ok(()); - } - IDLE => {} - _ => unreachable!(), - } - - m = match timeout { - Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0, - None => self.condvar.wait(m).unwrap(), - }; - - // Transition back to idle. If the state has transitioned to `NOTIFY`, - // this will consume that notification - self.state.store(IDLE, Ordering::SeqCst); - - // Explicitly drop the mutex guard. There is no real point in doing it - // except that I find it helpful to make it explicit where we want the - // mutex to unlock. - drop(m); - - Ok(()) - } - - fn unpark(&self) { - // First, try transitioning from IDLE -> NOTIFY, this does not require a - // lock. - match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { - IDLE | NOTIFY => return, - SLEEP => {} - _ => unreachable!(), - } - - // The other half is sleeping, this requires a lock - let _m = self.mutex.lock().unwrap(); - - // Transition to NOTIFY - match self.state.swap(NOTIFY, Ordering::SeqCst) { - SLEEP => {} - NOTIFY => return, - IDLE => return, - _ => unreachable!(), - } - - // Wakeup the sleeper - self.condvar.notify_one(); - } -} - -// ===== impl ParkThread ===== - -impl CachedParkThread { - /// Create a new `ParkThread` handle for the current thread. - /// - /// This type cannot be moved to other threads, so it should be created on - /// the thread that the caller intends to park. - #[cfg(feature = "rt-threaded")] - pub(crate) fn new() -> CachedParkThread { - CachedParkThread { - _anchor: PhantomData, - } - } - - /// Get a reference to the `ParkThread` handle for this thread. - fn with_current(&self, f: F) -> R - where - F: FnOnce(&ParkThread) -> R, - { - CURRENT_PARKER.with(|inner| f(inner)) - } -} - -impl Park for CachedParkThread { - type Unpark = UnparkThread; - type Error = ParkError; - - fn unpark(&self) -> Self::Unpark { - self.with_current(|park_thread| park_thread.unpark()) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park(None))?; - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park(Some(duration)))?; - Ok(()) - } -} - -impl Default for ParkThread { - fn default() -> Self { - Self::new() - } -} - -// ===== impl UnparkThread ===== - -impl Unpark for UnparkThread { - fn unpark(&self) { - self.inner.unpark(); - } -} - -#[cfg(feature = "rt-threaded")] -mod waker { - use super::{Inner, UnparkThread}; - use crate::loom::sync::Arc; - - use std::mem; - use std::task::{RawWaker, RawWakerVTable, Waker}; - - impl UnparkThread { - pub(crate) fn into_waker(self) -> Waker { - unsafe { - let raw = unparker_to_raw_waker(self.inner); - Waker::from_raw(raw) - } - } - } - - impl Inner { - #[allow(clippy::wrong_self_convention)] - fn into_raw(this: Arc) -> *const () { - Arc::into_raw(this) as *const () - } - - unsafe fn from_raw(ptr: *const ()) -> Arc { - Arc::from_raw(ptr as *const Inner) - } - } - - unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { - RawWaker::new( - Inner::into_raw(unparker), - &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), - ) - } - - unsafe fn clone(raw: *const ()) -> RawWaker { - let unparker = Inner::from_raw(raw); - - // Increment the ref count - mem::forget(unparker.clone()); - - unparker_to_raw_waker(unparker) - } - - unsafe fn drop_waker(raw: *const ()) { - let _ = Inner::from_raw(raw); - } - - unsafe fn wake(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - } - - unsafe fn wake_by_ref(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - - // We don't actually own a reference to the unparker - mem::forget(unparker); - } -} diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index 552786677f0..98d0ee6fe58 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,5 +1,6 @@ +use crate::park::Park; +use crate::runtime::enter; use crate::runtime::time; -use crate::runtime::{enter, io, Park}; use std::future::Future; use std::mem::ManuallyDrop; @@ -16,11 +17,12 @@ pub(super) struct Shell { waker: Waker, } -type Handle = ::Unpark; +type Handle = ::Unpark; impl Shell { pub(super) fn new(driver: time::Driver) -> Shell { - let unpark = Arc::new(driver.unpark()); + // Make sure we don't mess up types (as we do casts later) + let unpark: Arc = Arc::new(driver.unpark()); let raw_waker = RawWaker::new( Arc::into_raw(unpark) as *const Handle as *const (), @@ -62,13 +64,13 @@ fn clone_waker(ptr: *const ()) -> RawWaker { } fn wake(ptr: *const ()) { - use crate::runtime::park::Unpark; + use crate::park::Unpark; let unpark = unsafe { Arc::from_raw(ptr as *const Handle) }; (unpark).unpark() } fn wake_by_ref(ptr: *const ()) { - use crate::runtime::park::Unpark; + use crate::park::Unpark; let unpark = ptr as *const Handle; unsafe { (*unpark).unpark() } diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs new file mode 100644 index 00000000000..cc2fbb53b5c --- /dev/null +++ b/tokio/src/runtime/spawner.rs @@ -0,0 +1,53 @@ +cfg_rt_core! { + use crate::runtime::basic_scheduler; + use crate::task::JoinHandle; + + use std::future::Future; +} + +cfg_rt_threaded! { + use crate::runtime::thread_pool; +} + +#[derive(Debug, Clone)] +pub(crate) enum Spawner { + Shell, + #[cfg(feature = "rt-core")] + Basic(basic_scheduler::Spawner), + #[cfg(feature = "rt-threaded")] + ThreadPool(thread_pool::Spawner), +} + +impl Spawner { + /// Enter the scheduler context + pub(crate) fn enter(&self, f: F) -> R + where + F: FnOnce() -> R, + { + match self { + Spawner::Shell => f(), + #[cfg(feature = "rt-core")] + Spawner::Basic(spawner) => spawner.enter(f), + #[cfg(feature = "rt-threaded")] + Spawner::ThreadPool(spawner) => spawner.enter(f), + } + } +} + +cfg_rt_core! { + impl Spawner { + pub(crate) fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Spawner::Shell => panic!("spawning not enabled for runtime"), + #[cfg(feature = "rt-core")] + Spawner::Basic(spawner) => spawner.spawn(future), + #[cfg(feature = "rt-threaded")] + Spawner::ThreadPool(spawner) => spawner.spawn(future), + } + } + } +} diff --git a/tokio/src/runtime/tests/mock_park.rs b/tokio/src/runtime/tests/mock_park.rs deleted file mode 100644 index 0fe28b36107..00000000000 --- a/tokio/src/runtime/tests/mock_park.rs +++ /dev/null @@ -1,66 +0,0 @@ -#![allow(warnings)] - -use crate::runtime::{Park, Unpark}; - -use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use std::sync::Arc; -use std::time::Duration; - -pub struct MockPark { - parks: HashMap>, -} - -#[derive(Clone)] -struct ParkImpl(Arc); - -struct Inner { - unparked: AtomicBool, -} - -impl MockPark { - pub fn new() -> MockPark { - MockPark { - parks: HashMap::new(), - } - } - - pub fn is_unparked(&self, index: usize) -> bool { - self.parks[&index].unparked.load(SeqCst) - } - - pub fn clear(&self, index: usize) { - self.parks[&index].unparked.store(false, SeqCst); - } - - pub fn mk_park(&mut self, index: usize) -> impl Park { - let inner = Arc::new(Inner { - unparked: AtomicBool::new(false), - }); - self.parks.insert(index, inner.clone()); - ParkImpl(inner) - } -} - -impl Park for ParkImpl { - type Unpark = ParkImpl; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - self.clone() - } - - fn park(&mut self) -> Result<(), Self::Error> { - unimplemented!(); - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - unimplemented!(); - } -} - -impl Unpark for ParkImpl { - fn unpark(&self) { - self.0.unparked.store(true, SeqCst); - } -} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 99ed8cd815f..a1910a444cd 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -2,6 +2,3 @@ #[cfg(loom)] pub(crate) mod loom_oneshot; - -#[cfg(not(loom))] -pub(crate) mod mock_park; diff --git a/tokio/src/runtime/thread_pool/current.rs b/tokio/src/runtime/thread_pool/current.rs index 1ab83c54f46..60a207234e0 100644 --- a/tokio/src/runtime/thread_pool/current.rs +++ b/tokio/src/runtime/thread_pool/current.rs @@ -1,5 +1,4 @@ use crate::loom::sync::Arc; -use crate::runtime::park::Unpark; use crate::runtime::thread_pool::{slice, Owned}; use std::cell::Cell; @@ -23,10 +22,9 @@ struct Inner { // Pointer to the current worker info thread_local!(static CURRENT_WORKER: Cell = Cell::new(Inner::new())); -pub(super) fn set(pool: &Arc>, index: usize, f: F) -> R +pub(super) fn set(pool: &Arc, index: usize, f: F) -> R where F: FnOnce() -> R, - P: Unpark, { CURRENT_WORKER.with(|cell| { assert!(cell.get().workers.is_null()); @@ -65,10 +63,7 @@ where } impl Current { - pub(super) fn as_member<'a, P>(&self, set: &'a slice::Set

) -> Option<&'a Owned

> - where - P: Unpark, - { + pub(super) fn as_member<'a>(&self, set: &'a slice::Set) -> Option<&'a Owned> { let inner = CURRENT_WORKER.with(|cell| cell.get()); if ptr::eq(inner.workers as *const _, set.shared().as_ptr()) { diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 4b23c3b90ed..3d795fa427a 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -18,9 +18,8 @@ mod slice; mod shared; use self::shared::Shared; -mod shutdown; - mod worker; +use worker::Worker; cfg_blocking! { pub(crate) use worker::block_in_place; @@ -39,9 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; -use crate::blocking; -use crate::loom::sync::Arc; -use crate::runtime::Park; +use crate::runtime::{self, blocking, Parker}; use crate::task::JoinHandle; use std::fmt; @@ -50,48 +47,29 @@ use std::future::Future; /// Work-stealing based thread pool for executing futures. pub(crate) struct ThreadPool { spawner: Spawner, - - /// Shutdown waiter - shutdown_rx: shutdown::Receiver, } -// The Arc> is needed because loom doesn't support Arc where T: !Sized -// loom doesn't support that because it requires CoerceUnsized, which is -// unstable -type Callback = Arc>; +pub(crate) struct Workers { + workers: Vec, +} impl ThreadPool { - pub(crate) fn new( + pub(crate) fn new( pool_size: usize, - blocking_pool: blocking::Spawner, - around_worker: Callback, - mut build_park: F, - ) -> ThreadPool - where - F: FnMut(usize) -> P, - P: Park + Send + 'static, - { - let (shutdown_tx, shutdown_rx) = shutdown::channel(); - - let (pool, workers) = worker::create_set::<_, BoxedPark

>( + parker: Parker, + ) -> (ThreadPool, Workers) { + let (pool, workers) = worker::create_set( pool_size, - |i| BoxedPark::new(build_park(i)), - blocking_pool.clone(), - around_worker, - shutdown_tx, + parker, ); - // Spawn threads for each worker - for worker in workers { - blocking_pool.spawn_background(|| worker.run()); - } - let spawner = Spawner::new(pool); - ThreadPool { + let pool = ThreadPool { spawner, - shutdown_rx, - } + }; + + (pool, Workers { workers }) } /// Returns reference to `Spawner`. @@ -124,13 +102,6 @@ impl ThreadPool { enter.block_on(future) }) } - - /// Shutdown the thread pool. - pub(crate) fn shutdown_now(&mut self) { - if self.spawner.workers().close() { - self.shutdown_rx.wait(); - } - } } impl fmt::Debug for ThreadPool { @@ -141,37 +112,17 @@ impl fmt::Debug for ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - self.shutdown_now(); + self.spawner.workers().close(); } } -// TODO: delete? -pub(crate) struct BoxedPark

{ - inner: P, -} - -impl

BoxedPark

{ - pub(crate) fn new(inner: P) -> Self { - BoxedPark { inner } - } -} - -impl

Park for BoxedPark

-where - P: Park, -{ - type Unpark = Box; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - Box::new(self.inner.unpark()) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park() - } - - fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { - self.inner.park_timeout(duration) +impl Workers { + pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) { + blocking_pool.enter(|| { + for worker in self.workers { + let b = blocking_pool.clone(); + runtime::spawn_blocking(move || worker.run(b)); + } + }); } } diff --git a/tokio/src/runtime/thread_pool/owned.rs b/tokio/src/runtime/thread_pool/owned.rs index 88284d5ef56..b60eb7f3d31 100644 --- a/tokio/src/runtime/thread_pool/owned.rs +++ b/tokio/src/runtime/thread_pool/owned.rs @@ -7,7 +7,7 @@ use std::cell::Cell; /// Per-worker data accessible only by the thread driving the worker. #[derive(Debug)] -pub(super) struct Owned { +pub(super) struct Owned { /// Worker generation. This guards concurrent access to the `Owned` struct. /// When a worker starts running, it checks that the generation it has /// assigned matches the current generation. When it does, the worker has @@ -36,17 +36,14 @@ pub(super) struct Owned { pub(super) rand: FastRand, /// Work queue - pub(super) work_queue: queue::Worker>, + pub(super) work_queue: queue::Worker, /// List of tasks owned by the worker - pub(super) owned_tasks: task::OwnedList>, + pub(super) owned_tasks: task::OwnedList, } -impl

Owned

-where - P: 'static, -{ - pub(super) fn new(work_queue: queue::Worker>, rand: FastRand) -> Owned

{ +impl Owned { + pub(super) fn new(work_queue: queue::Worker, rand: FastRand) -> Owned { Owned { generation: AtomicUsize::new(0), tick: Cell::new(1), @@ -61,7 +58,7 @@ where } /// Returns `true` if a worker should be notified - pub(super) fn submit_local(&self, task: Task>) -> bool { + pub(super) fn submit_local(&self, task: Task) -> bool { let ret = self.work_queue.push(task); if self.defer_notification.get() { @@ -72,15 +69,15 @@ where } } - pub(super) fn submit_local_yield(&self, task: Task>) { + pub(super) fn submit_local_yield(&self, task: Task) { self.work_queue.push_yield(task); } - pub(super) fn bind_task(&mut self, task: &Task>) { + pub(super) fn bind_task(&mut self, task: &Task) { self.owned_tasks.insert(task); } - pub(super) fn release_task(&mut self, task: &Task>) { + pub(super) fn release_task(&mut self, task: &Task) { self.owned_tasks.remove(task); } } diff --git a/tokio/src/runtime/thread_pool/shared.rs b/tokio/src/runtime/thread_pool/shared.rs index 99981151d24..86c784ad486 100644 --- a/tokio/src/runtime/thread_pool/shared.rs +++ b/tokio/src/runtime/thread_pool/shared.rs @@ -1,4 +1,5 @@ -use crate::runtime::park::Unpark; +use crate::park::Unpark; +use crate::runtime::Unparker; use crate::runtime::thread_pool::slice; use crate::task::{self, Schedule, Task}; @@ -11,12 +12,9 @@ use std::ptr; /// - other workers /// - tasks /// -pub(crate) struct Shared

-where - P: 'static, -{ +pub(crate) struct Shared { /// Thread unparker - unpark: P, + unpark: Unparker, /// Tasks pending drop. Any worker pushes tasks, only the "owning" worker /// pops. @@ -26,17 +24,14 @@ where /// /// The slice::Set itself is tracked by an `Arc`, but this pointer is not /// included in the ref count. - slices: *const slice::Set

, + slices: *const slice::Set, } -unsafe impl Send for Shared

{} -unsafe impl Sync for Shared

{} +unsafe impl Send for Shared {} +unsafe impl Sync for Shared {} -impl

Shared

-where - P: Unpark, -{ - pub(super) fn new(unpark: P) -> Shared

{ +impl Shared { + pub(super) fn new(unpark: Unparker) -> Shared { Shared { unpark, pending_drop: task::TransferStack::new(), @@ -52,19 +47,16 @@ where self.unpark.unpark(); } - fn slices(&self) -> &slice::Set

{ + fn slices(&self) -> &slice::Set { unsafe { &*self.slices } } - pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set

) { + pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set) { self.slices = slices; } } -impl

Schedule for Shared

-where - P: Unpark, -{ +impl Schedule for Shared { fn bind(&self, task: &Task) { // Get access to the Owned component. This function can only be called // when on the worker. diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs index 4b3ef996104..aa521a15e52 100644 --- a/tokio/src/runtime/thread_pool/slice.rs +++ b/tokio/src/runtime/thread_pool/slice.rs @@ -3,7 +3,8 @@ //! slice. use crate::loom::rand::seed; -use crate::runtime::park::Unpark; +use crate::park::Park; +use crate::runtime::Parker; use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared}; use crate::task::{self, JoinHandle, Task}; use crate::util::{CachePadded, FastRand}; @@ -11,48 +12,38 @@ use crate::util::{CachePadded, FastRand}; use std::cell::UnsafeCell; use std::future::Future; -pub(super) struct Set

-where - P: 'static, -{ +pub(super) struct Set { /// Data accessible from all workers. - shared: Box<[Shared

]>, + shared: Box<[Shared]>, /// Data owned by the worker. - owned: Box<[UnsafeCell>>]>, + owned: Box<[UnsafeCell>]>, /// Submit work to the pool while *not* currently on a worker thread. - inject: queue::Inject>, + inject: queue::Inject, /// Coordinates idle workers idle: Idle, } -unsafe impl Send for Set

{} -unsafe impl Sync for Set

{} +unsafe impl Send for Set {} +unsafe impl Sync for Set {} -impl

Set

-where - P: Unpark, -{ +impl Set { /// Create a new worker set using the provided queues. - pub(crate) fn new(num_workers: usize, mut mk_unpark: F) -> Self - where - F: FnMut(usize) -> P, - { - assert!(num_workers > 0); + pub(crate) fn new(parkers: &[Parker]) -> Self { + assert!(!parkers.is_empty()); - let queues = queue::build(num_workers); + let queues = queue::build(parkers.len()); let inject = queues[0].injector(); let mut shared = Vec::with_capacity(queues.len()); let mut owned = Vec::with_capacity(queues.len()); for (i, queue) in queues.into_iter().enumerate() { - let unpark = mk_unpark(i); let rand = FastRand::new(seed()); - shared.push(Shared::new(unpark)); + shared.push(Shared::new(parkers[i].unpark())); owned.push(UnsafeCell::new(CachePadded::new(Owned::new(queue, rand)))); } @@ -60,12 +51,21 @@ where shared: shared.into_boxed_slice(), owned: owned.into_boxed_slice(), inject, - idle: Idle::new(num_workers), - // blocking, + idle: Idle::new(parkers.len()), } } - fn inject_task(&self, task: Task>) { + pub(crate) fn spawn_typed(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (task, handle) = task::joinable(future); + self.schedule(task); + handle + } + + fn inject_task(&self, task: Task) { self.inject.push(task, |res| { if let Err(task) = res { task.shutdown(); @@ -95,7 +95,7 @@ where } } - pub(crate) fn schedule(&self, task: Task>) { + pub(crate) fn schedule(&self, task: Task) { current::get(|current_worker| match current_worker.as_member(self) { Some(worker) => { if worker.submit_local(task) { @@ -136,28 +136,26 @@ where self.shared.len() } - pub(super) fn index_of(&self, shared: &Shared

) -> usize { + pub(super) fn index_of(&self, shared: &Shared) -> usize { use std::mem; - let size = mem::size_of::>(); + let size = mem::size_of::(); ((shared as *const _ as usize) - (&self.shared[0] as *const _ as usize)) / size } - pub(super) fn shared(&self) -> &[Shared

] { + pub(super) fn shared(&self) -> &[Shared] { &self.shared } - pub(super) fn owned(&self) -> &[UnsafeCell>>] { + pub(super) fn owned(&self) -> &[UnsafeCell>] { &self.owned } pub(super) fn idle(&self) -> &Idle { &self.idle } -} -impl Set

{ /// Wait for all locks on the injection queue to drop. /// /// This is done by locking w/o doing anything. @@ -166,21 +164,9 @@ impl Set

{ } } -impl Drop for Set

{ +impl Drop for Set { fn drop(&mut self) { // Before proceeding, wait for all concurrent wakers to exit self.wait_for_unlocked(); } } - -impl Set> { - pub(crate) fn spawn_typed(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (task, handle) = task::joinable(future); - self.schedule(task); - handle - } -} diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs index 4773ea9ada6..4fccad966df 100644 --- a/tokio/src/runtime/thread_pool/spawner.rs +++ b/tokio/src/runtime/thread_pool/spawner.rs @@ -1,5 +1,4 @@ use crate::loom::sync::Arc; -use crate::runtime::park::Unpark; use crate::runtime::thread_pool::slice; use crate::task::JoinHandle; @@ -20,11 +19,11 @@ use std::future::Future; /// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner #[derive(Clone)] pub(crate) struct Spawner { - workers: Arc>>, + workers: Arc, } impl Spawner { - pub(super) fn new(workers: Arc>>) -> Spawner { + pub(super) fn new(workers: Arc) -> Spawner { Spawner { workers } } @@ -46,7 +45,7 @@ impl Spawner { } /// Reference to the worker set. Used by `ThreadPool` to initiate shutdown. - pub(super) fn workers(&self) -> &slice::Set> { + pub(super) fn workers(&self) -> &slice::Set { &*self.workers } } diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index b982e24ec6d..81e292d6e36 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -1,14 +1,12 @@ +use crate::runtime::{self, Runtime}; use crate::runtime::tests::loom_oneshot as oneshot; -use crate::runtime::thread_pool::ThreadPool; -use crate::runtime::{Park, Unpark}; use crate::spawn; use loom::sync::atomic::{AtomicBool, AtomicUsize}; -use loom::sync::{Arc, Mutex, Notify}; +use loom::sync::{Arc, Mutex}; use std::future::Future; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use std::time::Duration; #[test] fn pool_multi_spawn() { @@ -46,7 +44,7 @@ fn pool_multi_spawn() { #[test] fn only_blocking() { loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { @@ -56,7 +54,7 @@ fn only_blocking() { }); block_rx.recv(); - pool.shutdown_now(); + drop(pool); }); } @@ -64,7 +62,7 @@ fn only_blocking() { fn blocking_and_regular() { const NUM: usize = 3; loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); let cnt = Arc::new(AtomicUsize::new(0)); let (block_tx, block_rx) = oneshot::channel(); @@ -91,7 +89,7 @@ fn blocking_and_regular() { done_rx.recv(); block_rx.recv(); - pool.shutdown_now(); + drop(pool); }); } @@ -153,7 +151,7 @@ fn complete_block_on_under_load() { use futures::FutureExt; loom::model(|| { - let pool = mk_pool(2); + let mut pool = mk_pool(2); pool.block_on({ futures::future::lazy(|_| ()).then(|_| { @@ -171,20 +169,11 @@ fn complete_block_on_under_load() { } fn mk_pool(num_threads: usize) -> Runtime { - use crate::blocking::BlockingPool; - - let blocking_pool = BlockingPool::new("test".into(), None); - let executor = ThreadPool::new( - num_threads, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |_| LoomPark::new(), - ); - - Runtime { - executor, - blocking_pool, - } + runtime::Builder::new() + .threaded_scheduler() + .num_threads(num_threads) + .build() + .unwrap() } use futures::future::poll_fn; @@ -244,69 +233,3 @@ fn gated2(thread: bool) -> impl Future { } }) } - -/// Fake runtime -struct Runtime { - executor: ThreadPool, - #[allow(dead_code)] - blocking_pool: crate::blocking::BlockingPool, -} - -use std::ops; - -impl ops::Deref for Runtime { - type Target = ThreadPool; - - fn deref(&self) -> &ThreadPool { - &self.executor - } -} - -impl ops::DerefMut for Runtime { - fn deref_mut(&mut self) -> &mut ThreadPool { - &mut self.executor - } -} - -struct LoomPark { - notify: Arc, -} - -struct LoomUnpark { - notify: Arc, -} - -impl LoomPark { - fn new() -> LoomPark { - LoomPark { - notify: Arc::new(Notify::new()), - } - } -} - -impl Park for LoomPark { - type Unpark = LoomUnpark; - - type Error = (); - - fn unpark(&self) -> LoomUnpark { - let notify = self.notify.clone(); - LoomUnpark { notify } - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.notify.wait(); - Ok(()) - } - - fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { - self.notify.wait(); - Ok(()) - } -} - -impl Unpark for LoomUnpark { - fn unpark(&self) { - self.notify.notify(); - } -} diff --git a/tokio/src/runtime/thread_pool/tests/loom_queue.rs b/tokio/src/runtime/thread_pool/tests/loom_queue.rs index d0598c3eb94..a4e10620164 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_queue.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_queue.rs @@ -64,5 +64,6 @@ fn multi_worker() { } fn val(num: u32) -> Task { - task::background(async move { num }) + let (task, _) = task::joinable(async move { num }); + task } diff --git a/tokio/src/runtime/thread_pool/tests/mod.rs b/tokio/src/runtime/thread_pool/tests/mod.rs index dc1d3158577..6638c55870f 100644 --- a/tokio/src/runtime/thread_pool/tests/mod.rs +++ b/tokio/src/runtime/thread_pool/tests/mod.rs @@ -4,8 +4,5 @@ mod loom_pool; #[cfg(loom)] mod loom_queue; -#[cfg(not(loom))] -mod pool; - #[cfg(not(loom))] mod queue; diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs deleted file mode 100644 index 25c11ea9d30..00000000000 --- a/tokio/src/runtime/thread_pool/tests/pool.rs +++ /dev/null @@ -1,206 +0,0 @@ -#![warn(rust_2018_idioms)] - -use crate::blocking; -use crate::runtime::thread_pool::ThreadPool; -use crate::runtime::{Park, Unpark}; - -use futures::future::poll_fn; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::*; -use std::sync::{mpsc, Arc}; -use std::task::{Context, Poll, Waker}; -use std::time::Duration; - -#[test] -fn eagerly_drops_futures() { - use std::sync::{mpsc, Mutex}; - - struct MyPark { - rx: mpsc::Receiver<()>, - tx: Mutex>, - #[allow(dead_code)] - park_tx: mpsc::SyncSender<()>, - unpark_tx: mpsc::SyncSender<()>, - } - - impl Park for MyPark { - type Unpark = MyUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MyUnpark { - tx: Mutex::new(self.tx.lock().unwrap().clone()), - unpark_tx: self.unpark_tx.clone(), - } - } - - fn park(&mut self) -> Result<(), Self::Error> { - let _ = self.rx.recv(); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - let _ = self.rx.recv_timeout(duration); - Ok(()) - } - } - - struct MyUnpark { - tx: Mutex>, - #[allow(dead_code)] - unpark_tx: mpsc::SyncSender<()>, - } - - impl Unpark for MyUnpark { - fn unpark(&self) { - let _ = self.tx.lock().unwrap().send(()); - } - } - - let (task_tx, task_rx) = mpsc::channel(); - let (drop_tx, drop_rx) = mpsc::channel(); - let (park_tx, park_rx) = mpsc::sync_channel(0); - let (unpark_tx, unpark_rx) = mpsc::sync_channel(0); - - let blocking_pool = blocking::BlockingPool::new("test".into(), None); - - let pool = ThreadPool::new( - 4, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |_| { - let (tx, rx) = mpsc::channel(); - MyPark { - tx: Mutex::new(tx), - rx, - park_tx: park_tx.clone(), - unpark_tx: unpark_tx.clone(), - } - }, - ); - - struct MyTask { - task_tx: Option>, - drop_tx: mpsc::Sender<()>, - } - - impl Future for MyTask { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if let Some(tx) = self.get_mut().task_tx.take() { - tx.send(cx.waker().clone()).unwrap(); - } - - Poll::Pending - } - } - - impl Drop for MyTask { - fn drop(&mut self) { - self.drop_tx.send(()).unwrap(); - } - } - - pool.spawn(MyTask { - task_tx: Some(task_tx), - drop_tx, - }); - - // Wait until we get the task handle. - let task = task_rx.recv().unwrap(); - - // Drop the pool, this should result in futures being forcefully dropped. - drop(pool); - - // Make sure `MyPark` and `MyUnpark` were dropped during shutdown. - assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)); - assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)); - - // If the future is forcefully dropped, then we will get a signal here. - drop_rx.recv().unwrap(); - - // Ensure `task` lives until after the test completes. - drop(task); -} - -#[test] -fn park_called_at_interval() { - struct MyPark { - park_light: Arc, - } - - struct MyUnpark {} - - impl Park for MyPark { - type Unpark = MyUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MyUnpark {} - } - - fn park(&mut self) -> Result<(), Self::Error> { - use std::thread; - use std::time::Duration; - - thread::sleep(Duration::from_millis(1)); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - if duration == Duration::from_millis(0) { - self.park_light.store(true, Relaxed); - Ok(()) - } else { - self.park() - } - } - } - - impl Unpark for MyUnpark { - fn unpark(&self) {} - } - - let park_light_1 = Arc::new(AtomicBool::new(false)); - let park_light_2 = park_light_1.clone(); - - let (done_tx, done_rx) = mpsc::channel(); - - let blocking_pool = blocking::BlockingPool::new("test".into(), None); - - let pool = ThreadPool::new( - 1, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |idx| { - assert_eq!(idx, 0); - MyPark { - park_light: park_light_2.clone(), - } - }, - ); - - let mut cnt = 0; - - pool.spawn(poll_fn(move |cx| { - let did_park_light = park_light_1.load(Relaxed); - - if did_park_light { - // There is a bit of a race where the worker can tick a few times - // before seeing the task - assert!(cnt > 50); - done_tx.send(()).unwrap(); - return Poll::Ready(()); - } - - cnt += 1; - - cx.waker().wake_by_ref(); - Poll::Pending - })); - - done_rx.recv().unwrap(); -} diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index cf6b66d84b1..92f3cfbd838 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -1,8 +1,9 @@ -use crate::blocking; use crate::loom::cell::CausalCell; use crate::loom::sync::Arc; -use crate::runtime::park::{Park, Unpark}; -use crate::runtime::thread_pool::{current, shutdown, slice, Callback, Owned, Shared, Spawner}; +use crate::park::Park; +use crate::runtime::{self, blocking}; +use crate::runtime::park::Parker; +use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner}; use crate::task::Task; use std::cell::Cell; @@ -37,23 +38,17 @@ cfg_blocking! { } } -pub(crate) struct Worker { +pub(crate) struct Worker { /// Parks the thread. Requires the calling worker to have obtained unique /// access via the generation synchronization action. - inner: Arc>, + inner: Arc, /// Scheduler slices - slices: Arc>, + slices: Arc, /// Slice assigned to this worker index: usize, - /// Handle to the blocking pool - blocking_pool: blocking::Spawner, - - /// Run before calling worker logic - around_worker: Callback, - /// Worker generation. This is used to synchronize access to the internal /// data. generation: usize, @@ -64,21 +59,17 @@ pub(crate) struct Worker { /// Internal worker state. This may be referenced from multiple threads, but the /// generation guard protects unsafe access -struct Inner { +struct Inner { /// Used to park the thread - park: CausalCell

, - - /// Only held so that the scheduler can be signaled on shutdown. - shutdown_tx: shutdown::Sender, + park: CausalCell, } -// TODO: clean up -unsafe impl Send for Worker

{} +unsafe impl Send for Worker {} /// Used to ensure the invariants are respected -struct GenerationGuard<'a, P: Park + 'static> { +struct GenerationGuard<'a> { /// Worker reference - worker: &'a Worker

, + worker: &'a Worker, /// Prevent `Sync` access _p: PhantomData>, @@ -87,38 +78,28 @@ struct GenerationGuard<'a, P: Park + 'static> { struct WorkerGone; // TODO: Move into slices -pub(super) fn create_set( +pub(super) fn create_set( pool_size: usize, - mk_park: F, - blocking_pool: blocking::Spawner, - around_worker: Callback, - shutdown_tx: shutdown::Sender, -) -> (Arc>, Vec>) -where - P: Send + Park, - F: FnMut(usize) -> P, -{ + parker: Parker, +) -> (Arc, Vec) { // Create the parks... - let parks: Vec<_> = (0..pool_size).map(mk_park).collect(); + let parkers: Vec<_> = (0..pool_size).map(|_| parker.clone()).collect(); - let mut slices = Arc::new(slice::Set::new(pool_size, |i| parks[i].unpark())); + let mut slices = Arc::new(slice::Set::new(&parkers)); // Establish the circular link between the individual worker state // structure and the container. Arc::get_mut(&mut slices).unwrap().set_ptr(); // This will contain each worker. - let workers = parks + let workers = parkers .into_iter() .enumerate() - .map(|(index, park)| { + .map(|(index, parker)| { Worker::new( slices.clone(), index, - park, - blocking_pool.clone(), - around_worker.clone(), - shutdown_tx.clone(), + parker, ) }) .collect(); @@ -132,114 +113,94 @@ where /// The number is fairly arbitrary. I believe this value was copied from golang. const GLOBAL_POLL_INTERVAL: u16 = 61; -impl

Worker

-where - P: Send + Park, -{ +impl Worker { // Safe as aquiring a lock is required before doing anything potentially // dangerous. pub(super) fn new( - slices: Arc>, + slices: Arc, index: usize, - park: P, - blocking_pool: blocking::Spawner, - around_worker: Callback, - shutdown_tx: shutdown::Sender, + park: Parker, ) -> Self { Worker { inner: Arc::new(Inner { park: CausalCell::new(park), - shutdown_tx, }), slices, index, - blocking_pool, - around_worker, generation: 0, gone: Cell::new(false), } } - pub(super) fn run(self) - where - P: Park>, - { - (self.around_worker)(self.index, &mut || { - // First, acquire a lock on the worker. - let guard = match self.acquire_lock() { - Some(guard) => guard, - None => return, - }; - - let spawner = Spawner::new(self.slices.clone()); - - // Track the current worker - current::set(&self.slices, self.index, || { - // Enter a runtime context - let _enter = crate::runtime::enter(); - - crate::runtime::global::with_thread_pool(&spawner, || { - self.blocking_pool.enter(|| { - ON_BLOCK.with(|ob| { - // Ensure that the ON_BLOCK is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell>); - - impl<'a> Drop for Reset<'a> { - fn drop(&mut self) { - self.0.set(None); - } + pub(super) fn run(self, blocking_pool: blocking::Spawner) { + // First, acquire a lock on the worker. + let guard = match self.acquire_lock() { + Some(guard) => guard, + None => return, + }; + + let spawner = Spawner::new(self.slices.clone()); + + // Track the current worker + current::set(&self.slices, self.index, || { + // Enter a runtime context + let _enter = crate::runtime::enter(); + + crate::runtime::global::with_thread_pool(&spawner, || { + blocking_pool.enter(|| { + ON_BLOCK.with(|ob| { + // Ensure that the ON_BLOCK is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a Cell>); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.set(None); } + } - let _reset = Reset(ob); + let _reset = Reset(ob); - let allow_blocking: &dyn Fn() = &|| self.block_in_place(); + let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool); - ob.set(Some(unsafe { - // NOTE: We cannot use a safe cast to raw pointer here, since we are - // _also_ erasing the lifetime of these pointers. That is safe here, - // because we know that ob will set back to None before allow_blocking - // is dropped. - #[allow(clippy::useless_transmute)] - std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) - })); + ob.set(Some(unsafe { + // NOTE: We cannot use a safe cast to raw pointer here, since we are + // _also_ erasing the lifetime of these pointers. That is safe here, + // because we know that ob will set back to None before allow_blocking + // is dropped. + #[allow(clippy::useless_transmute)] + std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) + })); - let _ = guard.run(); + let _ = guard.run(); - // Ensure that we reset ob before allow_blocking is dropped. - drop(_reset); - }); - }) + // Ensure that we reset ob before allow_blocking is dropped. + drop(_reset); + }); }) - }); - - if self.gone.get() { - // Synchronize with the pool for load(Acquire) in is_closed to get - // up-to-date value. - self.slices.wait_for_unlocked(); - - if self.slices.is_closed() { - // If the pool is shutting down, some other thread may be - // waiting to clean up after the task that we were holding on - // to. If we completed that task, we did nothing (because - // task.run() returned None), and so crucially we did not wait - // up any such thread. - // - // So, we have to do that here. - self.slices.notify_all(); - } - } + }) }); - // We have to drop the `shutdown_tx` handle last to ensure expected - // ordering. - let shutdown_tx = self.inner.shutdown_tx.clone(); - drop(self); - drop(shutdown_tx); + if self.gone.get() { + // Synchronize with the pool for load(Acquire) in is_closed to get + // up-to-date value. + self.slices.wait_for_unlocked(); + + if self.slices.is_closed() { + // If the pool is shutting down, some other thread may be + // waiting to clean up after the task that we were holding on + // to. If we completed that task, we did nothing (because + // task.run() returned None), and so crucially we did not wait + // up any such thread. + // + // So, we have to do that here. + self.slices.notify_all(); + } + } } /// Acquire the lock - fn acquire_lock(&self) -> Option> { + fn acquire_lock(&self) -> Option> { // Safety: Only getting `&self` access to access atomic field let owned = unsafe { &*self.slices.owned()[self.index].get() }; @@ -262,10 +223,7 @@ where } /// Enter an in-place blocking section - fn block_in_place(&self) - where - P: Park>, - { + fn block_in_place(&self, blocking_pool: &blocking::Spawner) { // If our Worker has already been given away, then blocking is fine! if self.gone.get() { return; @@ -313,21 +271,17 @@ where inner: self.inner.clone(), slices: self.slices.clone(), index: self.index, - blocking_pool: self.blocking_pool.clone(), - around_worker: self.around_worker.clone(), generation: self.generation + 1, gone: Cell::new(false), }; // Give away the worker - self.blocking_pool.spawn_background(move || worker.run()); + let b = blocking_pool.clone(); + runtime::spawn_blocking(move || worker.run(b)); } } -impl

GenerationGuard<'_, P> -where - P: Park + 'static, -{ +impl GenerationGuard<'_> { fn run(self) -> Result<(), WorkerGone> { let mut me = self; @@ -392,7 +346,7 @@ where } /// Find local work - fn find_local_work(&mut self) -> Option>> { + fn find_local_work(&mut self) -> Option> { let tick = self.tick_fetch_inc(); if tick % GLOBAL_POLL_INTERVAL == 0 { @@ -413,7 +367,7 @@ where } } - fn steal_work(&mut self) -> Option>> { + fn steal_work(&mut self) -> Option> { let num_slices = self.worker.slices.len(); let start = self.owned().rand.fastrand_n(num_slices as u32); @@ -501,7 +455,7 @@ where /// Runs the task. During the task execution, it is possible for worker to /// transition to a new thread. In this case, the caller loses the guard to /// access the generation and must stop processing. - fn run_task(mut self, task: Task>) -> Result { + fn run_task(mut self, task: Task) -> Result { if self.is_searching() { self.transition_from_searching(); } @@ -553,7 +507,7 @@ where // calling the parker. This is done in a loop as spurious wakeups are // permitted. loop { - self.park_mut().park().ok().expect("park failed"); + self.park_mut().park().expect("park failed"); // We might have been woken to clean up a dropped task self.maintenance(); @@ -571,7 +525,6 @@ where self.park_mut() .park_timeout(Duration::from_millis(0)) - .ok() .expect("park failed"); self.owned().defer_notification.set(false); @@ -617,7 +570,7 @@ where // `transition_to_parked` is not called as we are not working // anymore. When a task is released, the owning worker is unparked // directly. - self.park_mut().park().ok().expect("park failed"); + self.park_mut().park().expect("park failed"); // Try draining more tasks self.drain_tasks_pending_drop(); @@ -639,21 +592,21 @@ where self.worker.index } - fn slices(&self) -> &slice::Set { + fn slices(&self) -> &slice::Set { &self.worker.slices } - fn shared(&self) -> &Shared { + fn shared(&self) -> &Shared { &self.slices().shared()[self.index()] } - fn owned(&self) -> &Owned { + fn owned(&self) -> &Owned { let index = self.index(); // safety: we own the slot unsafe { &*self.slices().owned()[index].get() } } - fn park_mut(&mut self) -> &mut P { + fn park_mut(&mut self) -> &mut Parker { // Safety: `&mut self` on `GenerationGuard` implies it is safe to // perform the action. unsafe { self.worker.inner.park.with_mut(|ptr| &mut *ptr) } diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs index b72971f869d..1cd58cd6f5c 100644 --- a/tokio/src/runtime/time.rs +++ b/tokio/src/runtime/time.rs @@ -3,36 +3,49 @@ //! shells. This isolates the complexity of dealing with conditional //! compilation. -cfg_time! { +pub(crate) use variant::*; + +#[cfg(all(feature = "time", not(loom)))] +mod variant { + use crate::park::Either; use crate::runtime::io; use crate::time::{self, driver}; pub(crate) type Clock = time::Clock; - pub(crate) type Driver = driver::Driver; - pub(crate) type Handle = driver::Handle; + pub(crate) type Driver = Either, io::Driver>; + pub(crate) type Handle = Option; pub(crate) fn create_clock() -> Clock { Clock::new() } /// Create a new timer driver / handle pair - pub(crate) fn create_driver(io_driver: io::Driver, clock: Clock) -> (Driver, Handle) { - let driver = driver::Driver::new(io_driver, clock); - let handle = driver.handle(); + pub(crate) fn create_driver( + enable: bool, + io_driver: io::Driver, + clock: Clock, + ) -> (Driver, Handle) { + if enable { + let driver = driver::Driver::new(io_driver, clock); + let handle = driver.handle(); - (driver, handle) + (Either::A(driver), Some(handle)) + } else { + (Either::B(io_driver), None) + } } pub(crate) fn with_default(handle: &Handle, clock: &Clock, f: F) -> R where F: FnOnce() -> R, { - let _time = driver::set_default(handle); + let _time = handle.as_ref().map(|handle| driver::set_default(handle)); clock.enter(f) } } -cfg_not_time! { +#[cfg(any(not(feature = "time"), loom))] +mod variant { use crate::runtime::io; pub(crate) type Clock = (); @@ -44,7 +57,11 @@ cfg_not_time! { } /// Create a new timer driver / handle pair - pub(crate) fn create_driver(io_driver: io::Driver, _clock: Clock) -> (Driver, Handle) { + pub(crate) fn create_driver( + _enable: bool, + io_driver: io::Driver, + _clock: Clock, + ) -> (Driver, Handle) { (io_driver, ()) } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 3ee563c3657..2954eb85f43 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))] + //! Future-aware synchronization //! //! This module is enabled with the **`sync`** feature flag. @@ -33,22 +35,18 @@ cfg_sync! { } cfg_not_sync! { - cfg_atomic_waker! { + cfg_resource_drivers! { mod task; pub(crate) use task::AtomicWaker; } - cfg_rt_threaded! { + cfg_rt_core! { pub(crate) mod oneshot; } cfg_signal! { pub(crate) mod mpsc; pub(crate) mod semaphore; - - cfg_not_rt_threaded! { - pub(crate) mod oneshot; - } } } diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index 4445bb729c1..1b79ee1749f 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -1,4 +1,3 @@ -use crate::blocking; use crate::task::JoinHandle; cfg_rt_threaded! { @@ -60,6 +59,6 @@ cfg_blocking! { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - blocking::spawn_blocking(f) + crate::runtime::spawn_blocking(f) } } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 6a619cff956..dc98b6d48bf 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -291,20 +291,6 @@ pub(crate) trait Schedule: Send + Sync + Sized + 'static { fn schedule(&self, task: Task); } -cfg_rt_threaded! { - /// Create a new task without an associated join handle - pub(crate) fn background(task: T) -> Task - where - T: Future + Send + 'static, - S: Schedule, - { - Task { - raw: RawTask::new_background::<_, S>(task), - _p: PhantomData, - } - } -} - /// Create a new task with an associated join handle pub(crate) fn joinable(task: T) -> (Task, JoinHandle) where diff --git a/tokio/src/task/raw.rs b/tokio/src/task/raw.rs index c1879344ab2..9d439f11ec5 100644 --- a/tokio/src/task/raw.rs +++ b/tokio/src/task/raw.rs @@ -54,18 +54,6 @@ pub(super) fn vtable() -> &'static Vtable { } } -cfg_rt_threaded! { - impl RawTask { - pub(super) fn new_background(task: T) -> RawTask - where - T: Future + Send + 'static, - S: Schedule, - { - RawTask::new::<_, S>(task, State::new_background()) - } - } -} - impl RawTask { pub(super) fn new_joinable(task: T) -> RawTask where diff --git a/tokio/src/task/stack.rs b/tokio/src/task/stack.rs index e2dd3838cd6..36ebb797e79 100644 --- a/tokio/src/task/stack.rs +++ b/tokio/src/task/stack.rs @@ -28,7 +28,7 @@ impl TransferStack { // At this point, the queue_next field may also be used to track // whether or not the task must drop the join waker. - debug_assert_eq!(0, next & 1); + debug_assert_eq!(0, next & !1); // We don't care about any memory associated w/ setting the `head` // field, just the current value. diff --git a/tokio/src/task/state.rs b/tokio/src/task/state.rs index a9284ac9eed..dadfe5401db 100644 --- a/tokio/src/task/state.rs +++ b/tokio/src/task/state.rs @@ -57,14 +57,6 @@ const INITIAL_STATE: usize = NOTIFIED; /// All transitions are performed via RMW operations. This establishes an /// unambiguous modification order. impl State { - /// Starts with a ref count of 1 - #[cfg(feature = "rt-threaded")] - pub(super) fn new_background() -> State { - State { - val: AtomicUsize::new(INITIAL_STATE), - } - } - /// Starts with a ref count of 2 pub(super) fn new_joinable() -> State { State { diff --git a/tokio/src/task/tests/task.rs b/tokio/src/task/tests/task.rs index 9121c7df596..8f5fec1bf23 100644 --- a/tokio/src/task/tests/task.rs +++ b/tokio/src/task/tests/task.rs @@ -25,7 +25,7 @@ fn create_complete_drop() { tx.send(1).unwrap(); }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock = mock().bind(&task).release_local(); let mock = &mut || Some(From::from(&mock)); @@ -50,7 +50,7 @@ fn create_yield_complete_drop() { tx.send(1).unwrap(); }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock = mock().bind(&task).release_local(); let mock = || Some(From::from(&mock)); @@ -80,7 +80,7 @@ fn create_clone_yield_complete_drop() { tx.send(1).unwrap(); }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock = mock().bind(&task).release_local(); let mock = || Some(From::from(&mock)); @@ -107,7 +107,7 @@ fn create_wake_drop() { let (task, did_drop) = track_drop(async move { rx.await }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock = mock().bind(&task).schedule().release_local(); @@ -140,7 +140,7 @@ fn notify_complete() { .await; }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock = mock().bind(&task).release_local(); let mock = &mut || Some(From::from(&mock)); @@ -159,7 +159,7 @@ fn complete_on_second_schedule_obj() { tx.send(1).unwrap(); }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock1 = mock(); let mock2 = mock().bind(&task).release(); @@ -396,7 +396,7 @@ fn task_panic_background() { "hello" }); - let task = task::background(task); + let (task, _) = task::joinable(task); let mock = mock().bind(&task).release_local(); diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 74309524c0f..f5c5c5c6cec 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -16,7 +16,7 @@ mod stack; use self::stack::Stack; use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; -use crate::runtime::{Park, Unpark}; +use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; use crate::time::{Clock, Duration, Instant}; diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index 50b6a8feabf..fd2c552c565 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! Utilities for tracking time. //! //! This module provides a number of types for executing code after a set period diff --git a/tokio/src/time/tests/mock_clock.rs b/tokio/src/time/tests/mock_clock.rs index e38cbfa6da9..966952f8406 100644 --- a/tokio/src/time/tests/mock_clock.rs +++ b/tokio/src/time/tests/mock_clock.rs @@ -1,4 +1,4 @@ -use crate::runtime::{Park, Unpark}; +use crate::park::{Park, Unpark}; use crate::time::driver::{self, Driver}; use crate::time::{Clock, Duration, Instant}; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index a50b33ac37e..7b4a12e58e9 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -9,4 +9,7 @@ cfg_rt_threaded! { mod rand; pub(crate) use rand::FastRand; + + mod try_lock; + pub(crate) use try_lock::TryLock; } diff --git a/tokio/src/util/try_lock.rs b/tokio/src/util/try_lock.rs new file mode 100644 index 00000000000..493fce882f5 --- /dev/null +++ b/tokio/src/util/try_lock.rs @@ -0,0 +1,63 @@ +use crate::loom::sync::atomic::AtomicBool; + +use std::cell::UnsafeCell; +use std::marker::PhantomData; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::Ordering::SeqCst; + +pub(crate) struct TryLock { + locked: AtomicBool, + data: UnsafeCell, +} + +pub(crate) struct LockGuard<'a, T> { + lock: &'a TryLock, + _p: PhantomData>, +} + +unsafe impl Send for TryLock { } +unsafe impl Sync for TryLock { } + +unsafe impl Sync for LockGuard<'_, T> { } + +impl TryLock { + /// Create a new `TryLock` + pub(crate) fn new(data: T) -> TryLock { + TryLock { + locked: AtomicBool::new(false), + data: UnsafeCell::new(data), + } + } + + /// Attempt to acquire lock + pub(crate) fn try_lock(&self) -> Option> { + if self.locked.compare_exchange(false, true, SeqCst, SeqCst).is_err() { + return None; + } + + Some(LockGuard { + lock: self, + _p: PhantomData, + }) + } +} + +impl Deref for LockGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +impl DerefMut for LockGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +impl Drop for LockGuard<'_, T> { + fn drop(&mut self) { + self.lock.locked.store(false, SeqCst); + } +} diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs index a97de8d2acf..ec51373f44e 100644 --- a/tokio/tests/io_driver.rs +++ b/tokio/tests/io_driver.rs @@ -44,7 +44,11 @@ fn test_drop_on_notify() { // shutting down. Then, when the task handle is dropped, the task itself is // dropped. - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); let (addr_tx, addr_rx) = mpsc::channel(); diff --git a/tokio/tests/io_driver_drop.rs b/tokio/tests/io_driver_drop.rs index c75631b73a2..6b923dd3b2c 100644 --- a/tokio/tests/io_driver_drop.rs +++ b/tokio/tests/io_driver_drop.rs @@ -6,7 +6,7 @@ use tokio_test::{assert_err, assert_pending, assert_ready, task}; #[test] fn tcp_doesnt_block() { - let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = rt(); let mut listener = rt.enter(|| { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); @@ -24,7 +24,7 @@ fn tcp_doesnt_block() { #[test] fn drop_wakes() { - let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = rt(); let mut listener = rt.enter(|| { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); @@ -42,3 +42,11 @@ fn drop_wakes() { assert!(task.is_woken()); assert_ready!(task.poll()); } + +fn rt() -> runtime::Runtime { + runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs index 5571c199a37..1b8dcc9531a 100644 --- a/tokio/tests/process_issue_42.rs +++ b/tokio/tests/process_issue_42.rs @@ -18,7 +18,11 @@ fn run_test() { let finished_clone = finished.clone(); thread::spawn(move || { - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); let mut futures = FuturesOrdered::new(); rt.block_on(async { diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 039bb22f1e7..700f1029e7c 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -29,6 +29,7 @@ fn spawned_task_does_not_progress_without_block_on() { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 33f779c70a9..b360158d254 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -10,16 +10,21 @@ macro_rules! rt_test { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } } - mod thread_pool { + mod threaded_scheduler { $($t)* fn rt() -> Runtime { - Runtime::new().unwrap() + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() } } } @@ -341,7 +346,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = Runtime::new().unwrap(); + let mut rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -359,6 +364,100 @@ rt_test! { }); } + #[test] + fn spawn_from_blocking() { + let mut rt = rt(); + + let out = rt.block_on(async move { + let inner = assert_ok!(tokio::task::spawn_blocking(|| { + tokio::spawn(async move { "hello" }) + }).await); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + + #[test] + fn delay_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + assert_ok!(tokio::task::spawn_blocking(|| { + let now = std::time::Instant::now(); + let dur = Duration::from_millis(1); + + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + tokio::time::delay_for(dur).await; + }); + + assert!(now.elapsed() >= dur); + }).await); + }); + } + + #[test] + fn socket_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let peer = tokio::task::spawn_blocking(move || { + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + assert_ok!(TcpStream::connect(addr).await); + }); + }); + + // Wait for the client to connect + let _ = assert_ok!(listener.accept().await); + + assert_ok!(peer.await); + }); + } + + #[test] + fn io_driver_called_when_under_load() { + let mut rt = rt(); + + // Create a lot of constant load. The scheduler will always be busy. + for _ in 0..100 { + rt.spawn(async { + loop { + tokio::task::yield_now().await; + } + }); + } + + // Do some I/O work + rt.block_on(async { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let srv = tokio::spawn(async move { + let (mut stream, _) = assert_ok!(listener.accept().await); + assert_ok!(stream.write_all(b"hello world").await); + }); + + let cli = tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(addr).await); + let mut dst = vec![0; 11]; + + assert_ok!(stream.read_exact(&mut dst).await); + assert_eq!(dst, b"hello world"); + }); + + assert_ok!(srv.await); + assert_ok!(cli.await); + }); + } + #[test] fn client_server_block_on() { let mut rt = rt(); @@ -371,12 +470,11 @@ rt_test! { } #[test] - #[ignore] fn panic_in_task() { - let rt = rt(); - let (tx, rx) = mpsc::channel(); + let mut rt = rt(); + let (tx, rx) = oneshot::channel(); - struct Boom(mpsc::Sender<()>); + struct Boom(Option>); impl Future for Boom { type Output = (); @@ -389,12 +487,12 @@ rt_test! { impl Drop for Boom { fn drop(&mut self) { assert!(::std::thread::panicking()); - self.0.send(()).unwrap(); + self.0.take().unwrap().send(()).unwrap(); } } - rt.spawn(Boom(tx)); - rx.recv().unwrap(); + rt.spawn(Boom(Some(tx))); + assert_ok!(rt.block_on(rx)); } #[test] @@ -428,6 +526,48 @@ rt_test! { assert_ok!(rt.block_on(handle)); } + #[test] + fn eagerly_drops_futures_on_shutdown() { + use std::sync::mpsc; + + struct Never { + drop_tx: mpsc::Sender<()>, + } + + impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.drop_tx.send(()).unwrap(); + } + } + + let mut rt = rt(); + + let (drop_tx, drop_rx) = mpsc::channel(); + let (run_tx, run_rx) = oneshot::channel(); + + rt.block_on(async move { + tokio::spawn(async move { + assert_ok!(run_tx.send(())); + + Never { drop_tx }.await + }); + + assert_ok!(run_rx.await); + }); + + drop(rt); + + assert_ok!(drop_rx.recv()); + } + async fn client_server(tx: mpsc::Sender<()>) { let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 8be6d036a09..4b4c880ece5 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -18,6 +18,7 @@ fn single_thread() { // No panic when starting a runtime w/ a single thread let _ = runtime::Builder::new() .threaded_scheduler() + .enable_all() .num_threads(1) .build(); } @@ -189,10 +190,11 @@ fn drop_threadpool_drops_futures() { let rt = runtime::Builder::new() .threaded_scheduler() - .after_start(move || { + .enable_all() + .on_thread_start(move || { a.fetch_add(1, Relaxed); }) - .before_stop(move || { + .on_thread_stop(move || { b.fetch_add(1, Relaxed); }) .build() @@ -218,7 +220,7 @@ fn drop_threadpool_drops_futures() { } #[test] -fn after_start_and_before_stop_is_called() { +fn start_stop_callbacks_called() { use std::sync::atomic::{AtomicUsize, Ordering}; let after_start = Arc::new(AtomicUsize::new(0)); @@ -228,10 +230,11 @@ fn after_start_and_before_stop_is_called() { let before_inner = before_stop.clone(); let mut rt = tokio::runtime::Builder::new() .threaded_scheduler() - .after_start(move || { + .enable_all() + .on_thread_start(move || { after_inner.clone().fetch_add(1, Ordering::Relaxed); }) - .before_stop(move || { + .on_thread_stop(move || { before_inner.clone().fetch_add(1, Ordering::Relaxed); }) .build() diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs index 0cb7d48225c..fc790c28d93 100644 --- a/tokio/tests/signal_drop_rt.rs +++ b/tokio/tests/signal_drop_rt.rs @@ -38,6 +38,7 @@ fn dropping_loops_does_not_cause_starvation() { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs index 8020c593680..e34e7c098ac 100644 --- a/tokio/tests/signal_multi_rt.rs +++ b/tokio/tests/signal_multi_rt.rs @@ -48,6 +48,7 @@ fn multi_loop() { fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() } diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs index 235d1960574..7de9c345593 100644 --- a/tokio/tests/time_rt.rs +++ b/tokio/tests/time_rt.rs @@ -27,7 +27,11 @@ fn timer_with_threaded_runtime() { fn timer_with_basic_scheduler() { use tokio::runtime::Builder; - let mut rt = Builder::new().basic_scheduler().build().unwrap(); + let mut rt = Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); let (tx, rx) = mpsc::channel(); rt.block_on(async move {