Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Simplifying and generalising pmap #14843

Closed
samoconnor opened this issue Jan 29, 2016 · 33 comments
Closed

RFC: Simplifying and generalising pmap #14843

samoconnor opened this issue Jan 29, 2016 · 33 comments
Labels
parallelism Parallel or distributed computation

Comments

@samoconnor
Copy link
Contributor

[ Update: https://github.com//pull/15409 implements most of this and is now merged ]

This issue covers some of the same ground as #12943 (stale), #14515 and #14736. My intention is to present an overall pmap refactoring plan here for comment and submit new PRs if it is seen as useful.

pmap Features

The current pmap implementation has the following features:

  1. Using @sync/@async to run mapped function in parallel and collect results.
  2. Automatic allocation of idle workers to remotecall_wait calls.
  3. Not reusing workers believed to be faulty.
  4. Option to retry on error.
  5. Option to continue mapping despite errors (returning exceptions in the result array).

These are all useful features. However, it seems to me that pmap currently tries to do too much and that many of these features would be useful in contexts other than pmap:

  • "1." is useful when using RPC mechanisms other than remotecall ( e.g. HTTP, readstring(::Cmd) or AWS Lambda).
  • "2." and "3." are probably useful to other users of the remotecall mechanism.
  • If "4." and "5." are useful with pmap they should also be useful with ordinary map.

Proposed Separation of Features

amap asyncmap #15058

asyncmap adds @sync/@async to regular map. (feature "1.")

# Run `::Cmd`s in parallel...
julia> @time map(t->run(`sleep $t`), 1:3)
6 seconds
julia> @time asyncmap(t->run(`sleep $t`), 1:3)
3 seconds

# Run downloads in parallel...
julia> url_data = asyncmap(download, urls))

WorkerPool and remote #15073

A WorkerPool keeps track of which workers are busy. take!(default_worker_pool()) yeilds a worker that is not already busy doing remotecall_wait, waiting if needed (feature "2".). If there is a
reliable way to identify "faulty" workers (feature "3.") then worker will not return fault workers. (Partial implementation in comments of: #14736)

remote takes a function and returns a lambda that executes the function on a remote worker.

Using asyncmap and remote together...

pmap(f, c...) = asyncmap(remote(f), c...)
function remotecall_fetch(f, args...)
    remotecall_fetch(f, default_worker_pool(), args...)
end
remote(f::Function) = (args...)->remotecall_fetch(f, args...)

@catch #15409

@catch takes a function can returns a lambda that catches any exceptions thrown by the function and returns the exception object.

pmap(f, v; err_stop=false) can be replaced with pmap(@catch(f), v) (feature "5.")

macro catch(f)
    :((args...) -> try $f(args...) catch ex; ex end)
end

retry #15409

retry takes a function and returns a lambda that retries the function if an error occurs (feature "4.").

pmap(f, v; err_retry=true) can be replaced with pmap(retry(f), v), or for more granular error handling pmap(retry(f, e -> isa(e, NetworkTimeoutError)), v)

function retry(f::Function, condition::Function=(e)->true, n::Integer=3)
    (args...) -> @repeat n try
        f(args...)
    catch e
        @retry if condition(e) end
    end
end

(@repeat and @retry are implemented in https://github.com/samoconnor/Retry.jl)

pmap Defaults

pmap currently has some unexpected (at least to me) defaults.

  • The default behaviour is to re-execute the mapped function on error irrespective of the type of error. (i.e. the function is assumed to be idempotent).
  • When the function throws an error, the worker is removed from the pool irrespective of the type of error (i.e. it is assumed that all errors are due to a faulty worker).
  • By default, errors thrown by the mapped function are returned as part of the result array rather than being re-thrown. This is inconsistent with regular map.
@kshyatt kshyatt added the parallelism Parallel or distributed computation label Jan 29, 2016
@bjarthur
Copy link
Contributor

bjarthur commented Feb 8, 2016

how easy would it be to have pmap dynamically adjust to a changing number of workers in the pool? my dream is to have a non-blocking addprocs for use on my private shared SGE cluster. when the load is high, qsub'ing a large number of procs can take awhile to completely finish. it would be huge be if ClusterManagers.launch was asynchronous, and could add to the pool as soon as each proc started, instead of waiting until they all started. pmap could then get started right away, at first using just a few workers, and adding more as they became available. @amitmurthy

@amitmurthy
Copy link
Contributor

@async addprocs(....) should work.

Changing pmap to use a shared queue, as discussed here - #14736 (comment), should allow us to start using started workers right away.

@amitmurthy
Copy link
Contributor

To expand on the above:

  • The SGE cluster manager creates a WorkerPool consisting of the workers it has launched
  • SGE manage with op as :register will add newly launched workers to the worker pool as and when they come online. :deregister will remove them.
  • pmap has an option to use a WorkerPool for distributing work

User code would be something like:

cman = SGEManager(np, queue)
@async addprocs(cman)
pmap(f, jobs; pids=worker_pool(cman))  # worker_pool is a method implemented by the SGE cluster manager and returns its pool of workers. 

@bjarthur
Copy link
Contributor

bjarthur commented Feb 9, 2016

thanks amit. question: in your proposed interface, can a worker which comes online during the execution of a particular pmap be utilized by that same call to pmap? that is what i am hoping for.

@amitmurthy
Copy link
Contributor

Yes.

@amitmurthy
Copy link
Contributor

@samoconnor, the proposed changes look good. Look forward to the PRs.

@samoconnor
Copy link
Contributor Author

@amitmurthy, In the interests of avoiding one-big-PR, I plan to start by submitting a PR adds just adds amap (and its underlying iterator version imap) without changing any existing code as follows. Do you agree with this approach?

amap(f, c...; async_max=100) → collection
Transform collection c by applying f to each element using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise.
Note: amap(f, c...; async_max=1) is equivalent to map(f, c...).

Implementation:

amap(f, c...; kv...) = collect(imap(f, c...; kv...))

imap(f, c...; async_max=100) → iterator

Apply f to each element of c using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise.
Note: collect(imap(f, c...; async_max=1)) is equivalent to map(f, c...).

Implementation using StreamMapItr:

imap(f, c...; async_max=nothing) = StreamMapItr(f, c...; async_max=async_max)

amap!(function, collection; async_max=100)
In-place version of amap().

amap!(function, destination, collection...; async_max=100)
Like amap(), but stores the result in destination rather than a new collection. destination must be at least as large as the first collection.

Implementation using AsyncMapItr:

function amap!(f, c...; async_max=nothing)

    destination = c[1]
    if length(c) > 1
        c = c[2:end]
    end

    for task in AsyncMapItr(f, destination, c..., async_max=async_max) end

    return destination
end

@bjarthur
Copy link
Contributor

one benefit of a single large PR is that we could see all the proposed changes to the API at once. for example, i'm curious whether you plan to include an iterator-returning version of pmap. i proposed this myself once long ago: #4601

@blakejohnson
Copy link
Contributor

👍 I definitely like the separation into composable chunks.

@tkelman
Copy link
Contributor

tkelman commented Feb 11, 2016

Please don't open a PR that tries to change too many things at once. Incremental changes are much smoother to get reviewed and merged. The end vision can be seen "at once" in a package or separate branch, but PR's should be broken into smaller gradual chunks whenever possible.

@samoconnor
Copy link
Contributor Author

@bjarthur: If pmap(f, c...) = amap(remote(f), c...) then an iterator version of pmap would be simply: ipmap(f, c...) = imap(remote(f), c...). I think this is most definitely a useful thing. I don't know if ipmap is the right name for it... (sounds like it has something to do with IP Addresses).

@amitmurthy
Copy link
Contributor

Some thoughts. Apologies for the delayed response.

My understanding is that the suggested amap parallelizes over tasks and pmap over workers. Why don't we just have pmap that would use tasks, workers or a worker pool depending on a keyword arg. pmap as a parallel map is well understood. Async execution is typically understood to be non-blocking, i.e., returns immediately with the expression/function being executed in a different task.

Putting up some thoughts for discussion:

pmap(f, c...; ntasks=N, pids=< pid_array | worker_pool >) where

  • ntasks is the concurrent tasks per pid, defaults to 100
  • pids identifies either specific workers to distribute these tasks over or a worker pool

This will also help optimize pmap in cases where the data to be mapped is much larger than the number of workers and having pmap processes chunks of the list at a time will reduce network roundtrips. @tanmaykm , @ViralBShah saw this issue for some of their workloads.

This version of pmap should also use threads when they become fully available. The list to be mapped should be distributed across remote workers, followed by threads and finally tasks.

samoconnor added a commit to samoconnor/julia that referenced this issue Feb 13, 2016
@samoconnor
Copy link
Contributor Author

@amitmurthy, like the idea of having pmap parallelise over tasks and workers.

I think the first step is still to add the underlying imap and amap functions that the eventual pmap would rely on. See PR #15058.

(I guess if pmap eventually provides an interface to amap that is just as convenient, then amap could be relegated to being an un-exported implementation function.)

@samoconnor
Copy link
Contributor Author

... having pmap processes chunks of the list at a time will reduce network roundtrips. @tanmaykm , @ViralBShah saw this issue for some of their workloads.

@tanmaykm , @ViralBShah can you post test cases that cover the issues that @amitmurthy describes above?

@bjarthur
Copy link
Contributor

in regards to the request for clearer names in #15058 (comment), it's not obvious to me, from the name alone, that the proposed imap is asynchronous.

how about the following refactoring of the interface:

  1. get rid of amap and amap! and rename the proposed imap to amap. to get a collection, the user would have to manually say collect(amap), just like they have to do for a dict's keys() and values().
  2. then later, re-factor pmap to also return an iterator, which gets asynchronously (and lazily?) filled, just like amap. so only map returns a collection, which minimizes breakage.

so in the end, we have this:

map -> collection
amap -> iterator
pmap -> iterator
remote -> function
@catch -> function
retry -> function

@samoconnor
Copy link
Contributor Author

@bjarthur: I've taken you suggestion together with @StefanKarpinski's request for a clearer names and revised #15058 (comment) to have just one function:

asyncmap(f, c...; ntasks=0) = StreamMapIterator(f, c...; ntasks=ntasks)

@samoconnor
Copy link
Contributor Author

Reading the code for remotecall -> future and remotecall_fetch -> value leaves me asking: Are these names the wrong way around? Should it be remotecall_future -> future and remotecall -> value.

remotecall(f, id, args...) -> value would be more consistent with call(f, args...) -> value.

The usual pattern in Julia seems to be that non-blocking implementations are hidden and public interfaces are blocking.
e.g. #14546 (comment)

[in the context of ::IO] Everything in Julia always blocks the task it's called from until it's done. Under the hood it's all non-blocking, but that's exposed to the programmer via task-level concurrency.

Perhaps the preferred approach should be to use @async with remotecall -> value instead of remotecall -> future.

In Base there are only 4 calls to remotecall vs 25-ish to remotecall_fetch.

A GitHub-wide search finds 359 matches for remotecall_fetch, 115 matches for remotecall_wait and 458 matches for remotecall (458 includes remotecall_* due to the way GitHub text search works).

@samoconnor
Copy link
Contributor Author

@amitmurthy: see PR #15073, WorkerPool and remote()

@amitmurthy
Copy link
Contributor

I would rather not export asyncmap in #15058 if we are going to support task asynchronous execution via pmap itself.

I agree with your reasoning w.r.t. remotecall* names. It may be a bit late to change them....... What do others think?

samoconnor added a commit to samoconnor/julia that referenced this issue Feb 14, 2016
@samoconnor
Copy link
Contributor Author

I would rather not export asyncmap in #15058

OK, done: samoconnor@e263ad6

@samoconnor
Copy link
Contributor Author

I agree with your reasoning w.r.t. remotecall* names. It may be a bit late to change them...

Maybe if remote(f)(args...) per #15073 works well then remotecall* could eventually be deprecated.

@StefanKarpinski
Copy link
Member

Could pmap support both a list of workers (or a count) and a number of tasks per worker? Then we could have a single function for this.

@samoconnor
Copy link
Contributor Author

@StefanKarpinski:

Could pmap support both a list of workers (or a count) and a number of tasks per worker?

Yes, Amit suggested that in a previous comment.
Given Amit's goal of "having pmap processes chunks of the list at a time [to] reduce network roundtrips", I'm thinking of something simple like this to start with:

asyncpmap(f, v) = flatten( pmap(chunk -> asyncmap(f, chunk), eachchunk(v)) )

This assumes a reasonably balanced workload, which seems fine for now.
Another possible scheme would be to treat all "task slots" across all workers as a single pool. This would imply network comms for each task but would allow for unbalanced IO-bound workloads (e.g. downloading and indexing a large number of arbitrary URLs).

@tanmaykm
Copy link
Member

@samoconnor The observation that @amitmurthy was referring to in #14843 (comment) was abhijithch/RecSys.jl#22 (comment)

A simple test would be:

julia> # $ julia -p 8

julia> f = (x)->nothing;

julia> @time @parallel for i in 1:10^4 f(i) end;
  0.117559 seconds (180.03 k allocations: 7.235 MB)

julia> @time pmap(f, 1:10^4);
  1.196159 seconds (2.87 M allocations: 77.740 MB, 1.11% gc time)

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 7, 2016
@samoconnor
Copy link
Contributor Author

@amitmurthy, @StefanKarpinski

I intend to open a third PR (in addition to #15058 and #15073) to implement @catch and retry as described above.

The retry function above relies on the @repeat n try... macro from here:
https://github.com/samoconnor/Retry.jl/blob/master/src/repeat_try.jl.

Question: Should the PR...

  • include repeat_try.jl as-is (with comment and code formatting cleaned up a little to match Base conventions) ?
  • or, implement retry without using @repeat. i.e. manually copy/pasting the logic from @repeat?

(Note: At this point I think Retry.jl is reasonably well field tested. It is used in many places in the AWS* packages which I've been using quite heavily in code for a client.)

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 8, 2016
@samoconnor
Copy link
Contributor Author

@amitmurthy I have submitted #15409 with @catch and a lightweight implementation of retry (not dependant on Retry.jl).

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 9, 2016
@samoconnor
Copy link
Contributor Author

Amit's notes pasted from #15073:

We will also need to:

  • export WorkerPool and related methods
  • document the concept and usage in the manual
  • mention remote and WorkerPool in NEWS.md

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 9, 2016
@samoconnor
Copy link
Contributor Author

Status

Next steps

New pmap might look something like this:

asyncmap(f, c...) = collect(StreamMapIterator(f, c...))

function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)

    if err_retry != nothing
        depwarn("`err_retry` is deprecated, use `pmap(retry(f), c...) or `asyncmap(remote(retry(f)), c...)`.", :pmap)
        if err_retry == true
            return asyncmap(retry(remote(f)), c...)
        end
    end

    if err_stop != nothing
        depwarn("`err_stop` is deprecated, use `pmap(@catch(f), c...).", :pmap)
        if err_stop == true
            return asyncmap(remote(@catch(f)), c...)
        end
    end

    if pids != nothing
        depwarn("`pids` is deprecated. It no longer has any effect.", :pmap)
    end

    return asyncmap(remote(f), c...)
end

Perhaps there should be a kw arg pool= to replace pids=. e.g...

remote(f, pool) = (args...)->remotecall_fetch(f, pool, args...)

pmap(f, c...; pool=default_worker_pool()) = asyncmap(remote(f, pool), c...)

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 12, 2016
samoconnor added a commit to samoconnor/julia that referenced this issue Mar 22, 2016
samoconnor added a commit to samoconnor/julia that referenced this issue Mar 24, 2016
JuliaLang#14843

Add default small delay to retry.

50ms delay on first retry.
250ms delay on 2nd retry.

This at least gives other tasks a chance to run.

If retry n is set higher, the delay increases to 1250ms, 6250ms ...
max_delay caps the dealy at 10s by default.

This should handle network-timescale issues without creating undue load.
samoconnor added a commit to samoconnor/julia that referenced this issue Mar 24, 2016
JuliaLang#14843

Add default small delay to retry.

50ms delay on first retry.
250ms delay on 2nd retry.

This at least gives other tasks a chance to run.

If retry n is set higher, the delay increases to 1250ms, 6250ms ...
max_delay caps the dealy at 10s by default.

This should handle network-timescale issues without creating undue load.

tweak test/error.jl per https://travis-ci.org/JuliaLang/julia/jobs/114700424#L1597
@samoconnor
Copy link
Contributor Author

Refactored pmap is now in #15409.

function pgenerate(f, c)
    batches = batchsplit(c, min_batch_count = nworkers() * 3)
    return flatten(asyncgenerate(remote(b -> asyncmap(f, b)), batches))
end

pmap(f, c) = collect(pgenerate(f, c))

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 25, 2016
JuliaLang#14843

Add default small delay to retry.

50ms delay on first retry.
250ms delay on 2nd retry.

This at least gives other tasks a chance to run.

If retry n is set higher, the delay increases to 1250ms, 6250ms ...
max_delay caps the dealy at 10s by default.

This should handle network-timescale issues without creating undue load.

tweak test/error.jl per https://travis-ci.org/JuliaLang/julia/jobs/114700424#L1597
@samoconnor
Copy link
Contributor Author

#15409 introduces 3 un-exported generate functions that might be generally useful:

Should these be exported for general use in a seperate PR?

They are quite useful for chaining things together:

# For one...
result_url = upload_results(crunch_numbers(download_data(data_url)))

# For many in parallel...
result_urls = asyncmap(upload_results,
                       pgenerate(crunch_numbers,
                                 asyncgenerate(download_data, url_list)))

@StefanKarpinski
Copy link
Member

I like where this is headed, but... any time we use naming to express combinations of behaviors, I feel like we're missing something. In this case, we have map vs. generate, sync vs. async, and local vs. distributed (which is somewhat unfortunately called parallel rather than distributed). It seems like an ideal case for something compositional rather than having some subset of the eight possible names.

@samoconnor
Copy link
Contributor Author

FWIW #15409 tries to make things compositional under the covers.
e.g. asyncmap(f, c) = collect(asyncgenerate(f, c)),
pgenerate(f,c) = asyncgenerate(remote(f), c), and
pmap(f,c) = collect(pgenerate(f,c)).

My preference would be to make map and generate async by default (removing the need for asyncmap and asyncgenerate). There could still be special short-cut methods of map for cases where the types or size of the collection mean that a non-async implementation is faster. Maybe there could eventually be a static analysis that f never yields in some cases, thereby allowing the faster non-async map to be used.

I'd also remove pmap and just tell people to do map(remote(f), c) instead.

Then I'd make map return an iterator (removing the need for generate). This would cause the need for collect(map(f, c)) at some, but not all, call sites.

That would leave just: map and remote.

... maybe too extreme?

samoconnor added a commit to samoconnor/julia that referenced this issue Mar 29, 2016
JuliaLang#14843

Add default small delay to retry.

50ms delay on first retry.
250ms delay on 2nd retry.

This at least gives other tasks a chance to run.

If retry n is set higher, the delay increases to 1250ms, 6250ms ...
max_delay caps the dealy at 10s by default.

This should handle network-timescale issues without creating undue load.

tweak test/error.jl per https://travis-ci.org/JuliaLang/julia/jobs/114700424#L1597
samoconnor added a commit to samoconnor/julia that referenced this issue Mar 30, 2016
JuliaLang#14843

Add default small delay to retry.

50ms delay on first retry.
250ms delay on 2nd retry.

This at least gives other tasks a chance to run.

If retry n is set higher, the delay increases to 1250ms, 6250ms ...
max_delay caps the dealy at 10s by default.

This should handle network-timescale issues without creating undue load.

tweak test/error.jl per https://travis-ci.org/JuliaLang/julia/jobs/114700424#L1597
samoconnor added a commit to samoconnor/julia that referenced this issue Apr 12, 2016
JuliaLang#14843

Add default small delay to retry.

50ms delay on first retry.
250ms delay on 2nd retry.

This at least gives other tasks a chance to run.

If retry n is set higher, the delay increases to 1250ms, 6250ms ...
max_delay caps the dealy at 10s by default.

This should handle network-timescale issues without creating undue load.

tweak test/error.jl per https://travis-ci.org/JuliaLang/julia/jobs/114700424#L1597
samoconnor added a commit to samoconnor/julia that referenced this issue Apr 13, 2016
amitmurthy pushed a commit that referenced this issue Apr 14, 2016
* @catch, retry, partition, asyncmap and refactored pmap (#15409 and #14843)
@amitmurthy
Copy link
Contributor

Closed by #15409

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation
Projects
None yet
Development

No branches or pull requests

8 participants