-
Notifications
You must be signed in to change notification settings - Fork 506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spawning tasks to multiple thread pools from a single scope #782
Comments
We don't have a single scope tied to multiple pools, but you could use nested scopes for this, something like: let common = common_init();
io_pool.scope(|io_scope| {
io_scope.spawn(|_| early_io_work(&common, 1));
io_scope.spawn(|_| early_io_work(&common, 2));
cpu_pool.scope(|cpu_scope| {
cpu_scope.spawn(|_| cpu_work(&common, 1));
io_scope.spawn(|_| io_work(&common, 1));
cpu_scope.spawn(|_| cpu_work(&common, 2));
io_scope.spawn(|_| io_work(&common, 2));
// ...
); // waits for cpu_scope to complete
io_scope.spawn(|_| late_io_work(&common, 1));
io_scope.spawn(|_| late_io_work(&common, 2));
); // waits for io_scope to complete
drop(common); |
Ok, now what if the number of thread pools is known only at runtime? Recursion? |
Yeah, recursive nesting should be possible. I guess we might be able to control that better within rayon, since we can do it iteratively and still enforce the pub fn scopes<'scope, OP, R>(pools: &[ThreadPool], op: OP) -> R
where
OP: for<'s> FnOnce(&'s [Scope<'scope>]) -> R + 'scope (or But this seems like an unusual thing to want. I can understand separating CPU/IO pools, although something like tokio might be better for the IO part. What is a situation where you'd want a dynamic number of thread pools? |
I have a collection of objects and processing SOME of them require I/O but I/O performance is bad if multiple threads try to access the same hard drive. However, it is perfectly fine to access two or more drives independently. So I need one thread pool per device, and the number of threads is configured by the capabilities of the device (which are different for SSD and HDD). The first thing I searched for was a map_async method that could accept a lambda returning a Future. I found something like that in the parallel-stream crate, but unfortunately that crate doesnt do the other useful things that rayon does. As for Tokio, I don't want to bring in yet another big crate onto dependencies, because rayon is fine in the other parts. |
Ok, I'm "almost" there. Narrowed down to only one error: fn nest<'scope, 'pool, 'vec, OP, R>(pools: &'pool[ThreadPool], scopes: Vec<&'vec Scope<'scope>>, op: OP) -> R
where
OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
R: Send,
'pool: 'scope,
{
if pools.len() > 0 {
let new_len = pools.len() - 1;
pools[new_len].scope(move |s| {
let mut scopes: Vec<&Scope> = scopes;
scopes.push(s);
nest(&pools[0..new_len], scopes, |scopes| op(scopes))
})
} else {
(op)(&scopes)
}
}
pub fn scopes<'scope, 'pool, OP, R>(pools: &'pool[ThreadPool], op: OP) -> R
where
OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
R: Send,
'pool: 'scope
{
nest(pools, Vec::with_capacity(pools.len()), |scopes| op(scopes))
}
I guess this has something to do with the fact that the scope reference added to the vector has a shorter lifetime than the previous scope references in it (encoded in the type of the vector), but I don't know how to change the type of the vector so it can hold the new reference - at the end, I expect the 'vec to match the most nested scope... I need a Rust lifetime wizard now ;) |
I managed to minimize the problem to the following snippet (took out the recursion): fn nest<'scope, 'vec>(pool: &ThreadPool, scopes: Vec<&'vec Scope<'scope>>)
{
pool.scope(move |s: &Scope| {
let mut v = Vec::new();
v.push(s);
v.push(scopes[0])
})
}
Commenting out either of push lines makes the compiler happy. |
I tried to reproduce this problem without Rayon and now I'm puzzled even more. This compiles fine: struct FooScope<'a> { _unused: &'a u32 }
fn with_foo_scope<'scope, F>(op: F)
where F: for <'s> FnOnce(&'s FooScope<'scope>) + 'scope + Send {
}
fn lifetimes<'scope, 'vec>(foos: &'vec [FooScope<'scope>])
{
with_foo_scope(move |s: &FooScope| {
let x = [s, &foos[0]];
})
} But a very similar code with rayon doesn't: fn nest<'scope, 'vec>(pool: &ThreadPool, scopes: &'vec [Scope<'scope>])
{
pool.scope(move |s: &Scope| {
let x = [s, &scopes[0]];
})
}
What is the difference? |
@pkolaczk maybe this is due to invariance of |
That looks likely. So is there no way of keeping more than one scope in a collection together? |
Indeed, this is a problem with invariance. The offending line is the marker: pub struct FooScope<'scope> {
marker: PhantomData<Box<dyn FnOnce(&FooScope<'scope>) + Send + Sync + 'scope>>,
}
fn with_foo_scope<'scope, F, R>(op: F)
where F: for <'s> FnOnce(&'s FooScope<'scope>) -> R + 'scope + Send,
R: Send
{
}
fn lifetimes<'scope, 'vec>(scopes: &'vec [FooScope<'scope>])
{
with_foo_scope(move |s: &FooScope| {
let x = [s, &scopes[0]]; // error: data from `scopes` flows into `scopes` here
})
} |
Because of the invariance, you would need all the scopes to have the same outer So this seems to be something we could only provide from rayon internals, where we can build a In case this is a question, the scope invariance is indeed necessary. Otherwise it would be possible to spawn closures with smaller lifetimes, but the scope would only enforce that they complete before the original scope lifetime, which is too late. |
Ok, makes sense. Can you give me any hints on where to start from inside rayon? |
See
|
Thank you. Before I dive into Rayon internals, I managed to unblock myself with a little-bit of unsafe. unsafe fn adjust_lifetime<'s, 'a, 'b>(scope: &'s Scope<'a>) -> &'s Scope<'b> {
std::mem::transmute::<&'s Scope<'a>, &'s Scope<'b>>(scope)
}
fn nest<'scope, OP, R>(pools: &[&ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) -> R
where
OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
R: Send,
{
if pools.len() > 0 {
pools[0].scope(move |s: &Scope| {
let mut vec = scopes;
vec.push(unsafe { adjust_lifetime(s) });
nest(&pools[1..], vec, op)
})
} else {
(op)(&scopes)
}
}
pub fn scopes<'scope, OP, R>(pools: &[&ThreadPool], op: OP) -> R
where
OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
R: Send,
{
nest(pools, Vec::with_capacity(pools.len()), op)
} Is such use of |
And this is how to use it (for anybody who is interested): #[test]
fn test_two_pools() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
scopes(&[&pool1, &pool2], |scopes| {
let s1 = scopes[0];
let s2 = scopes[1];
for _ in 1..10000 {
s1.spawn(|_| println!("thread-pool 1: {:?}", std::thread::current().id()));
}
for _ in 1..10000 {
s2.spawn(|_| println!("thread-pool 2: {:?}", std::thread::current().id()));
}
});
} |
I think your unsafe code is OK, in that nothing will actually observe the transmuted lifetimes in a way that could cause unsoundness, but I hope we can do better... I'm actually wondering now -- I don't think let vec: Vec<u32> = (0..100).collect();
let slice = &vec[..];
rayon::scope(|s: &rayon::Scope<'static>| {
// We should be able to borrow here, but the spawns have to be static
for &i in slice {
s.spawn(move |_| println!("hello {}!", i));
}
}); If we relax |
See #785 -- I hope you don't mind that I adapted your nesting code into a test case. |
785: Remove the lifetime constraint from the scope OP r=cuviper a=cuviper We already know the `OP` lifetime must outlive the `scope` call itself, and we'll execute it synchronously, but we don't need to tie that to the `'scope` lifetime. By relaxing this, we can create scopes for different lifetimes than `OP` can satisfy, even fully `'static`. They will still await completion by the end of the `scope` call, but this can be useful for collecting the invariant scopes from different contexts, as the new nested tests demonstrate (adapted from @pkolaczk's code in #782). 790: Micro-optimize the WorkerThread::steal loop r=cuviper a=cuviper This lifts up the loop for `Steal::Retry`, so we continue with other threads before trying again. For most benchmarks, the performance change is in the noise, but this gives a consistent improvement in most of the microbenches -- up to 8% faster on `join_microbench::increment_all_max`. 791: Use crossbeam_deque::Injector instead of crossbeam_queue::SegQueue r=cuviper a=cuviper `Injector` and `SegQueue` are _almost_ identical, down to the very same comments in their implementations. One difference is that `Injector` allocates its first block as soon as it's created, but `SegQueue` waits until its first `push`, which complicates it to allow being `null`. `Injector` also has methods to steal batches into a deque `Worker`, which might be useful to us. At the very least, this lets us trim a dependency. Co-authored-by: Josh Stone <[email protected]>
#785 is merged and published in rayon-core 1.8.0 and rayon 1.4.0. Is there anything else you need? |
If anyone comes across this, here's a code snippet. It uses fn scope_all<'a, 'scope>(
pools: impl ExactSizeIterator<Item = &'a rayon::ThreadPool> + Send,
f: impl FnOnce(Vec<&rayon::Scope<'scope>>) + Send + 'scope,
) {
#[inline]
fn recursive_scope<'a, 'scope>(
mut pools: impl Iterator<Item = &'a rayon::ThreadPool> + Send,
scopes: Vec<&rayon::Scope<'scope>>,
f: impl FnOnce(Vec<&rayon::Scope<'scope>>) + Send + 'scope,
) {
match pools.next() {
None => return f(scopes),
Some(pool) => {
pool.in_place_scope(move |s| {
let mut scopes = scopes;
scopes.push(s);
recursive_scope(pools, scopes, f);
});
}
}
}
let vec = Vec::with_capacity(pools.len());
recursive_scope(pools, vec, f)
} |
@stevenengler Nice! But yeah, I don't expect tail calls, because each scope has cleanup (waiting for spawns) as they return. |
I've got a bunch of tasks to process on many thread pools. E.g. some tasks are CPU bound so should go to the CPU thread pool but some other tasks are I/O bound and I want them processed in a different pool dedicated to I/O. Additionally all tasks need to use a common parent object with lifetime shorter than 'static, so I need a single scope wrapped around all that processing.
Is it possible to do with rayon?
The text was updated successfully, but these errors were encountered: