Skip to content

Commit

Permalink
Merge rayon-rs#636
Browse files Browse the repository at this point in the history
636: Add the ability to customize thread spawning r=nikomatsakis a=cuviper

As an alternative to `ThreadPoolBuilder::build()` and `build_global()`,
the new `spawn()` and `spawn_global()` methods take a closure which will
be responsible for spawning the actual threads. This is called with a
`ThreadBuilder` argument that provides the thread index, name, and stack
size, with the expectation to call its `run()` method in the new thread.

The motivating use cases for this are:
- experimental WASM threading, to be externally implemented.
- scoped threads, like the new test using `scoped_tls`.

Co-authored-by: Josh Stone <[email protected]>
  • Loading branch information
bors[bot] and cuviper committed Jun 5, 2019
2 parents 12afe12 + 249ad3f commit 49fb38a
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 50 deletions.
6 changes: 6 additions & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ num_cpus = "1.2"
lazy_static = "1"
crossbeam-deque = "0.6.3"
crossbeam-queue = "0.1.2"
crossbeam-utils = "0.6.5"

[dev-dependencies]
rand = "0.6"
rand_xorshift = "0.1"
scoped-tls = "1.0"

[target.'cfg(unix)'.dev-dependencies]
libc = "0.2"
Expand All @@ -49,3 +51,7 @@ path = "tests/scope_join.rs"
[[test]]
name = "simple_panic"
path = "tests/simple_panic.rs"

[[test]]
name = "scoped_threadpool"
path = "tests/scoped_threadpool.rs"
198 changes: 189 additions & 9 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::str::FromStr;

extern crate crossbeam_deque;
extern crate crossbeam_queue;
extern crate crossbeam_utils;
#[cfg(any(debug_assertions, rayon_unstable))]
#[macro_use]
extern crate lazy_static;
Expand All @@ -46,6 +47,8 @@ extern crate rand_xorshift;

#[macro_use]
mod log;
#[macro_use]
mod private;

mod job;
mod join;
Expand All @@ -64,13 +67,16 @@ mod test;
#[cfg(rayon_unstable)]
pub mod internal;
pub use join::{join, join_context};
pub use registry::ThreadBuilder;
pub use scope::{scope, Scope};
pub use scope::{scope_fifo, ScopeFifo};
pub use spawn::{spawn, spawn_fifo};
pub use thread_pool::current_thread_has_pending_tasks;
pub use thread_pool::current_thread_index;
pub use thread_pool::ThreadPool;

use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};

/// Returns the number of threads in the current registry. If this
/// code is executing within a Rayon thread-pool, then this will be
/// the number of threads for the thread-pool of the current
Expand Down Expand Up @@ -123,8 +129,7 @@ enum ErrorKind {
///
/// [`ThreadPool`]: struct.ThreadPool.html
/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
#[derive(Default)]
pub struct ThreadPoolBuilder {
pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// The number of threads in the rayon thread pool.
/// If zero will use the RAYON_NUM_THREADS environment variable.
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
Expand All @@ -146,6 +151,9 @@ pub struct ThreadPoolBuilder {
/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// Closure invoked to spawn threads.
spawn_handler: S,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
Expand Down Expand Up @@ -174,12 +182,35 @@ type StartHandler = Fn(usize) + Send + Sync;
/// Note that this same closure may be invoked multiple times in parallel.
type ExitHandler = Fn(usize) + Send + Sync;

// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
impl Default for ThreadPoolBuilder {
fn default() -> Self {
ThreadPoolBuilder {
num_threads: 0,
panic_handler: None,
get_thread_name: None,
stack_size: None,
start_handler: None,
exit_handler: None,
spawn_handler: DefaultSpawn,
breadth_first: false,
}
}
}

impl ThreadPoolBuilder {
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
pub fn new() -> ThreadPoolBuilder {
ThreadPoolBuilder::default()
pub fn new() -> Self {
Self::default()
}
}

/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
impl<S> ThreadPoolBuilder<S>
where
S: ThreadSpawn,
{
/// Create a new `ThreadPool` initialized using this configuration.
pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
ThreadPool::build(self)
Expand Down Expand Up @@ -207,6 +238,154 @@ impl ThreadPoolBuilder {
registry.wait_until_primed();
Ok(())
}
}

impl ThreadPoolBuilder {
/// Create a scoped `ThreadPool` initialized using this configuration.
///
/// This is a convenience function for building a pool using [`crossbeam::scope`]
/// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
///
/// # Examples
///
/// A scoped pool may be useful in combination with scoped thread-local variables.
///
/// ```
/// #[macro_use]
/// extern crate scoped_tls;
/// # use rayon_core as rayon;
///
/// scoped_thread_local!(static POOL_DATA: Vec<i32>);
///
/// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
/// let pool_data = vec![1, 2, 3];
///
/// // We haven't assigned any TLS data yet.
/// assert!(!POOL_DATA.is_set());
///
/// rayon::ThreadPoolBuilder::new()
/// .build_scoped(
/// // Borrow `pool_data` in TLS for each thread.
/// |thread| POOL_DATA.set(&pool_data, || thread.run()),
/// // Do some work that needs the TLS data.
/// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
/// )?;
///
/// // Once we've returned, `pool_data` is no longer borrowed.
/// drop(pool_data);
/// Ok(())
/// }
/// ```
pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
where
W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
F: FnOnce(&ThreadPool) -> R,
{
let result = crossbeam_utils::thread::scope(|scope| {
let wrapper = &wrapper;
let pool = self
.spawn_handler(|thread| {
let mut builder = scope.builder();
if let Some(name) = thread.name() {
builder = builder.name(name.to_string());
}
if let Some(size) = thread.stack_size() {
builder = builder.stack_size(size);
}
builder.spawn(move |_| wrapper(thread))?;
Ok(())
})
.build()?;
Ok(with_pool(&pool))
});

match result {
Ok(result) => result,
Err(err) => unwind::resume_unwinding(err),
}
}
}

impl<S> ThreadPoolBuilder<S> {
/// Set a custom function for spawning threads.
///
/// Note that the threads will not exit until after the pool is dropped. It
/// is up to the caller to wait for thread termination if that is important
/// for any invariants. For instance, threads created in [`crossbeam::scope`]
/// will be joined before that scope returns, and this will block indefinitely
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
/// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
///
/// # Examples
///
/// A minimal spawn handler just needs to call `run()` from an independent thread.
///
/// ```
/// # use rayon_core as rayon;
/// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
/// let pool = rayon::ThreadPoolBuilder::new()
/// .spawn_handler(|thread| {
/// std::thread::spawn(|| thread.run());
/// Ok(())
/// })
/// .build()?;
///
/// pool.install(|| println!("Hello from my custom thread!"));
/// Ok(())
/// }
/// ```
///
/// The default spawn handler sets the name and stack size if given, and propagates
/// any errors from the thread builder.
///
/// ```
/// # use rayon_core as rayon;
/// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
/// let pool = rayon::ThreadPoolBuilder::new()
/// .spawn_handler(|thread| {
/// let mut b = std::thread::Builder::new();
/// if let Some(name) = thread.name() {
/// b = b.name(name.to_owned());
/// }
/// if let Some(stack_size) = thread.stack_size() {
/// b = b.stack_size(stack_size);
/// }
/// b.spawn(|| thread.run())?;
/// Ok(())
/// })
/// .build()?;
///
/// pool.install(|| println!("Hello from my fully custom thread!"));
/// Ok(())
/// }
/// ```
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
where
F: FnMut(ThreadBuilder) -> io::Result<()>,
{
ThreadPoolBuilder {
spawn_handler: CustomSpawn::new(spawn),
// ..self
num_threads: self.num_threads,
panic_handler: self.panic_handler,
get_thread_name: self.get_thread_name,
stack_size: self.stack_size,
start_handler: self.start_handler,
exit_handler: self.exit_handler,
breadth_first: self.breadth_first,
}
}

/// Returns a reference to the current spawn handler.
fn get_spawn_handler(&mut self) -> &mut S {
&mut self.spawn_handler
}

/// Get the number of threads that will be used for the thread
/// pool. See `num_threads()` for more information.
Expand Down Expand Up @@ -276,7 +455,7 @@ impl ThreadPoolBuilder {
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
/// variable. If both variables are specified, `RAYON_NUM_THREADS` will
/// be prefered.
pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder {
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}
Expand All @@ -300,7 +479,7 @@ impl ThreadPoolBuilder {
/// If the panic handler itself panics, this will abort the
/// process. To prevent this, wrap the body of your panic handler
/// in a call to `std::panic::catch_unwind()`.
pub fn panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder
pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
where
H: Fn(Box<Any + Send>) + Send + Sync + 'static,
{
Expand Down Expand Up @@ -368,7 +547,7 @@ impl ThreadPoolBuilder {
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
/// If that handler returns, then startup will continue normally.
pub fn start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder
pub fn start_handler<H>(mut self, start_handler: H) -> Self
where
H: Fn(usize) + Send + Sync + 'static,
{
Expand All @@ -387,7 +566,7 @@ impl ThreadPoolBuilder {
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
/// If that handler returns, then the thread will exit normally.
pub fn exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder
pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
where
H: Fn(usize) + Send + Sync + 'static,
{
Expand Down Expand Up @@ -503,7 +682,7 @@ pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
config.into_builder().build_global().map_err(Box::from)
}

impl fmt::Debug for ThreadPoolBuilder {
impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ThreadPoolBuilder {
ref num_threads,
Expand All @@ -512,6 +691,7 @@ impl fmt::Debug for ThreadPoolBuilder {
ref stack_size,
ref start_handler,
ref exit_handler,
spawn_handler: _,
ref breadth_first,
} = *self;

Expand Down
26 changes: 26 additions & 0 deletions rayon-core/src/private.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! The public parts of this private module are used to create traits
//! that cannot be implemented outside of our own crate. This way we
//! can feel free to extend those traits without worrying about it
//! being a breaking change for other implementations.
/// If this type is pub but not publicly reachable, third parties
/// can't name it and can't implement traits using it.
#[allow(missing_debug_implementations)]
pub struct PrivateMarker;

macro_rules! private_decl {
() => {
/// This trait is private; this method exists to make it
/// impossible to implement outside the crate.
#[doc(hidden)]
fn __rayon_private__(&self) -> ::private::PrivateMarker;
}
}

macro_rules! private_impl {
() => {
fn __rayon_private__(&self) -> ::private::PrivateMarker {
::private::PrivateMarker
}
}
}
Loading

0 comments on commit 49fb38a

Please sign in to comment.