diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index 0965bb95a..1d573b781 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -218,6 +218,7 @@ impl<'r> Latch for SpinLatch<'r> { /// A Latch starts as false and eventually becomes true. You can block /// until it becomes true. +#[derive(Debug)] pub(super) struct LockLatch { m: Mutex, v: Condvar, @@ -297,11 +298,9 @@ impl CountLatch { /// Decrements the latch counter by one. If this is the final /// count, then the latch is **set**, and calls to `probe()` will - /// return true. Returns whether the latch was set. This is an - /// internal operation, as it does not tickle, and to fail to - /// tickle would lead to deadlock. + /// return true. Returns whether the latch was set. #[inline] - fn set(&self) -> bool { + pub(super) fn set(&self) -> bool { if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { self.core_latch.set(); true @@ -328,6 +327,41 @@ impl AsCoreLatch for CountLatch { } } +#[derive(Debug)] +pub(super) struct CountLockLatch { + lock_latch: LockLatch, + counter: AtomicUsize, +} + +impl CountLockLatch { + #[inline] + pub(super) fn new() -> CountLockLatch { + CountLockLatch { + lock_latch: LockLatch::new(), + counter: AtomicUsize::new(1), + } + } + + #[inline] + pub(super) fn increment(&self) { + let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); + debug_assert!(old_counter != 0); + } + + pub(super) fn wait(&self) { + self.lock_latch.wait(); + } +} + +impl Latch for CountLockLatch { + #[inline] + fn set(&self) { + if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { + self.lock_latch.set(); + } + } +} + impl<'a, L> Latch for &'a L where L: Latch, diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index e76fab3fa..a8174e3da 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -78,7 +78,7 @@ mod test; pub use self::join::{join, join_context}; pub use self::registry::ThreadBuilder; -pub use self::scope::{scope, Scope}; +pub use self::scope::{external_scope, scope, Scope}; pub use self::scope::{scope_fifo, ScopeFifo}; pub use self::spawn::{spawn, spawn_fifo}; pub use self::thread_pool::current_thread_has_pending_tasks; diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 187383cbb..4156b9066 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -164,7 +164,7 @@ static THE_REGISTRY_SET: Once = Once::new(); /// Starts the worker threads (if that has not already happened). If /// initialization has not already occurred, use the default /// configuration. -fn global_registry() -> &'static Arc { +pub(super) fn global_registry() -> &'static Arc { set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) .expect("The global thread pool has not been initialized.") diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index a41d408e1..db89317b4 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -1,12 +1,13 @@ //! Methods for custom fork-join scopes, created by the [`scope()`] -//! function. These are a more flexible alternative to [`join()`]. +//! and [`external_scope()`] functions. These are a more flexible alternative to [`join()`]. //! //! [`scope()`]: fn.scope.html +//! [`external_scope()`]: fn.external_scope.html //! [`join()`]: ../join/join.fn.html use crate::job::{HeapJob, JobFifo}; -use crate::latch::CountLatch; -use crate::registry::{in_worker, Registry, WorkerThread}; +use crate::latch::{CountLatch, CountLockLatch, Latch}; +use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::unwind; use std::any::Any; use std::fmt; @@ -37,21 +38,31 @@ pub struct ScopeFifo<'scope> { fifos: Vec, } -struct ScopeBase<'scope> { - /// thread where `scope()` was executed (note that individual jobs - /// may be executing on different worker threads, though they - /// should always be within the same pool of threads) - owner_thread_index: usize, +enum ScopeLatch { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CountLatch, + registry: Arc, + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: CountLockLatch }, +} - /// thread registry where `scope()` was executed. +struct ScopeBase<'scope> { + /// thread registry where `scope()` was executed or where `external_scope()` + /// should spawn jobs. registry: Arc, /// if some job panicked, the error is stored here; it will be /// propagated to the one who created the scope panic: AtomicPtr>, - /// latch to set when the counter drops to zero (and hence this scope is complete) - job_completed_latch: CountLatch, + /// latch to track job counts + job_completed_latch: ScopeLatch, /// You can think of a scope as containing a list of closures to execute, /// all of which outlive `'scope`. They're not actually required to be @@ -289,8 +300,10 @@ where R: Send, { in_worker(|owner_thread, _| { - let scope = Scope::<'scope>::new(owner_thread); - unsafe { scope.base.complete(owner_thread, || op(&scope)) } + let scope = Scope { + base: ScopeBase::new(owner_thread, owner_thread.registry().clone()), + }; + unsafe { scope.base.complete(Some(owner_thread), || op(&scope)) } }) } @@ -381,17 +394,70 @@ where { in_worker(|owner_thread, _| { let scope = ScopeFifo::<'scope>::new(owner_thread); - unsafe { scope.base.complete(owner_thread, || op(&scope)) } + unsafe { scope.base.complete(Some(owner_thread), || op(&scope)) } }) } -impl<'scope> Scope<'scope> { - fn new(owner_thread: &WorkerThread) -> Self { - Scope { - base: ScopeBase::new(owner_thread), +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// This is just like `scope()` except the closure runs on the same thread +/// that calls `external_scope()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn external_scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + let worker_thread = WorkerThread::current(); + let (scope_base, thread) = if worker_thread.is_null() { + (ScopeBase::new_external(global_registry().clone()), None) + } else { + unsafe { + ( + ScopeBase::new(&*worker_thread, (*worker_thread).registry().clone()), + Some(&*worker_thread), + ) } - } + }; + let scope = Scope { base: scope_base }; + unsafe { scope.base.complete(thread, || op(&scope)) } +} + +pub(crate) fn do_external_scope<'scope, OP, R>(registry: Arc, op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + let worker_thread = WorkerThread::current(); + let (scope_base, thread) = if worker_thread.is_null() { + (ScopeBase::new_external(registry), None) + } else { + unsafe { + ( + ScopeBase::new(&*worker_thread, registry), + Some(&*worker_thread), + ) + } + }; + let scope = Scope { base: scope_base }; + unsafe { scope.base.complete(thread, || op(&scope)) } +} +impl<'scope> Scope<'scope> { /// Spawns a job into the fork-join scope `self`. This job will /// execute sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its @@ -457,6 +523,7 @@ impl<'scope> Scope<'scope> { // Since `Scope` implements `Sync`, we can't be sure that we're still in a // thread of this pool, so we can't just push to the local worker thread. + // Also, this might be an external scope. self.base.registry.inject_or_push(job_ref); } } @@ -466,7 +533,7 @@ impl<'scope> ScopeFifo<'scope> { fn new(owner_thread: &WorkerThread) -> Self { let num_threads = owner_thread.registry().num_threads(); ScopeFifo { - base: ScopeBase::new(owner_thread), + base: ScopeBase::new(owner_thread, owner_thread.registry().clone()), fifos: (0..num_threads).map(|_| JobFifo::new()).collect(), } } @@ -511,12 +578,20 @@ impl<'scope> ScopeFifo<'scope> { impl<'scope> ScopeBase<'scope> { /// Creates the base of a new scope for the given worker thread - fn new(owner_thread: &WorkerThread) -> Self { + fn new(owner: &WorkerThread, registry: Arc) -> Self { ScopeBase { - owner_thread_index: owner_thread.index(), - registry: owner_thread.registry().clone(), + registry: registry, panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: CountLatch::new(), + job_completed_latch: ScopeLatch::new(owner), + marker: PhantomData, + } + } + + fn new_external(registry: Arc) -> Self { + ScopeBase { + registry: registry, + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: ScopeLatch::new_blocking(), marker: PhantomData, } } @@ -529,12 +604,13 @@ impl<'scope> ScopeBase<'scope> { /// appropriate. /// /// Unsafe because it must be executed on a worker thread. - unsafe fn complete(&self, owner_thread: &WorkerThread, func: FUNC) -> R + unsafe fn complete(&self, owner: Option<&WorkerThread>, func: FUNC) -> R where FUNC: FnOnce() -> R, { let result = self.execute_job_closure(func); - self.steal_till_jobs_complete(owner_thread); + self.job_completed_latch.wait(owner); + self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated } @@ -560,11 +636,12 @@ impl<'scope> ScopeBase<'scope> { { match unwind::halt_unwinding(func) { Ok(r) => { - self.job_completed_ok(); + self.job_completed_latch.set(); Some(r) } Err(err) => { self.job_panicked(err); + self.job_completed_latch.set(); None } } @@ -581,20 +658,9 @@ impl<'scope> ScopeBase<'scope> { { mem::forget(err); // ownership now transferred into self.panic } - - self.job_completed_latch - .set_and_tickle_one(&self.registry, self.owner_thread_index); } - unsafe fn job_completed_ok(&self) { - self.job_completed_latch - .set_and_tickle_one(&self.registry, self.owner_thread_index); - } - - unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) { - // wait for job counter to reach 0: - owner_thread.wait_until(&self.job_completed_latch); - + unsafe fn maybe_propagate_panic(&self) { // propagate panic, if any occurred; at this point, all // outstanding jobs have completed, so we can use a relaxed // ordering: @@ -606,11 +672,60 @@ impl<'scope> ScopeBase<'scope> { } } +impl ScopeLatch { + fn new(owner: &WorkerThread) -> Self { + ScopeLatch::Stealing { + latch: CountLatch::new(), + registry: owner.registry().clone(), + worker_index: owner.index(), + } + } + + fn new_blocking() -> Self { + ScopeLatch::Blocking { + latch: CountLockLatch::new(), + } + } + + fn increment(&self) { + match self { + ScopeLatch::Stealing { latch, .. } => latch.increment(), + ScopeLatch::Blocking { latch } => latch.increment(), + } + } + + fn set(&self) { + match self { + ScopeLatch::Stealing { + latch, + registry, + worker_index, + } => latch.set_and_tickle_one(registry, *worker_index), + ScopeLatch::Blocking { latch } => latch.set(), + } + } + + fn wait(&self, owner: Option<&WorkerThread>) { + match self { + ScopeLatch::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + ScopeLatch::Blocking { latch } => latch.wait(), + } + } +} + impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Scope") .field("pool_id", &self.base.registry.id()) - .field("owner_thread_index", &self.base.owner_thread_index) .field("panic", &self.base.panic) .field("job_completed_latch", &self.base.job_completed_latch) .finish() @@ -622,9 +737,23 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> { fmt.debug_struct("ScopeFifo") .field("num_fifos", &self.fifos.len()) .field("pool_id", &self.base.registry.id()) - .field("owner_thread_index", &self.base.owner_thread_index) .field("panic", &self.base.panic) .field("job_completed_latch", &self.base.job_completed_latch) .finish() } } + +impl fmt::Debug for ScopeLatch { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ScopeLatch::Stealing { latch, .. } => fmt + .debug_tuple("ScopeLatch::Stealing") + .field(latch) + .finish(), + ScopeLatch::Blocking { latch } => fmt + .debug_tuple("ScopeLatch::Blocking") + .field(latch) + .finish(), + } + } +} diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index 2209f6304..777ca810e 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -5,6 +5,7 @@ use crate::join; use crate::registry::{Registry, ThreadSpawn, WorkerThread}; +use crate::scope::do_external_scope; use crate::spawn; #[allow(deprecated)] use crate::Configuration; @@ -221,6 +222,18 @@ impl ThreadPool { self.install(|| scope_fifo(op)) } + /// Creates a scope that spawns work into this thread-pool. + /// + /// See also: [the `external_scope()` function][external_scope]. + /// + /// [external_scope]: fn.external_scope.html + pub fn external_scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&Scope<'scope>) -> R, + { + do_external_scope(self.registry.clone(), op) + } + /// Spawns an asynchronous task in this thread-pool. This task will /// run in the implicit, global scope, which means that it may outlast /// the current stack frame -- therefore, it cannot capture any references diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 8d1c90ca1..f2c4d966b 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -336,3 +336,18 @@ fn nested_fifo_scopes() { }); assert_eq!(counter.into_inner(), pools.len()); } + +#[test] +fn external_scope_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.external_scope(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +}