-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pmap speedup #17331
pmap speedup #17331
Conversation
elseif isa(ntasks, Function) | ||
max_tasks = ntasks | ||
else | ||
throw(ArgumentError("ntasks must be an Integer or a zero-arg function returning the maximum number of tasks allowed.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is a zero-arg function desirable for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allows for the number of concurrent tasks in AsyncCollector
to be dynamic. One of the requested features was that pmap must be able to leverage workers as and when they come online. This allows for a user of AsyncCollector
, like pmap
to increase count of parallel tasks depending on current number of workers. Note that it is not a major issue if the number of tasks is more than the number of workers, but it will be quite inefficient the other way around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this would be accomplished via a zero-arg function that reads from some global runtime state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A WorkerPool
allows for workers to be dynamically added. pmap
now passes this keyword arg as ntasks=()->nworkers(p)
where p
is the worker pool.
674434b
to
0d385f5
Compare
@@ -51,7 +51,6 @@ function fetch(c::Channel) | |||
end | |||
|
|||
function take!(c::Channel) | |||
!isopen(c) && !isready(c) && throw(closed_exception()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the added test in the next commit exercise this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well yes, because that is how I found the bug. But it ought to have an explicit test anyway. Will add one.
@@ -19,44 +19,97 @@ type AsyncCollector | |||
f | |||
results | |||
enumerator::Enumerate | |||
ntasks::Int | |||
max_tasks::Function | |||
c::Channel{Tuple{Int, Any}} # to communicate with the tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a slightly more descriptive field name than c
might be good for clarity's sake
AsyncCollector: Limit numer of tasks and communicate with channels
0d385f5
to
191796e
Compare
Partially addresses #17301
On my machine:
this PR takes 8.8 seconds vs 6.6 seconds on 0.4