Skip to content

Commit

Permalink
avoid using @sync_add on remotecalls (#44671)
Browse files Browse the repository at this point in the history
* avoid using `@sync_add` on remotecalls

It seems like @sync_add adds the Futures to a queue (Channel) for @sync, which
in turn calls wait() for all the futures synchronously. Not only that is
slightly detrimental for network operations (latencies add up), but in case of
Distributed the call to wait() may actually cause some compilation on remote
processes, which is also wait()ed for. In result, some operations took a great
amount of "serial" processing time if executed on many workers at once.

For me, this closes #44645.

The major change can be illustrated as follows: First add some workers:

```
using Distributed
addprocs(10)
```

and then trigger something that, for example, causes package imports on the
workers:

```
using SomeTinyPackage
```

In my case (importing UnicodePlots on 10 workers), this improves the loading
time over 10 workers from ~11s to ~5.5s.

This is a far bigger issue when worker count gets high. The time of the
processing on each worker is usually around 0.3s, so triggering this problem
even on a relatively small cluster (64 workers) causes a really annoying delay,
and running `@everywhere` for the first time on reasonable clusters (I tested
with 1024 workers, see #44645) usually takes more than 5 minutes. Which sucks.

Anyway, on 64 workers this reduces the "first import" time from ~30s to ~6s,
and on 1024 workers this seems to reduce the time from over 5 minutes (I didn't
bother to measure that precisely now, sorry) to ~11s.

Related issues:
- Probably fixes #39291.
- #42156 is a kinda complementary -- it removes the most painful source of
  slowness (the 0.3s precompilation on the workers), but the fact that the
  wait()ing is serial remains a problem if the network latencies are high.

May help with #38931

Co-authored-by: Valentin Churavy <[email protected]>
(cherry picked from commit 62e0729)
  • Loading branch information
exaexa authored and KristofferC committed Jul 4, 2022
1 parent 5ec2d68 commit 1b60be5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
37 changes: 36 additions & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ isolating the asynchronous code from changes to the variable's value in the curr
Interpolating values via `\$` is available as of Julia 1.4.
"""
macro async(expr)
do_async_macro(expr)
end

# generate the code for @async, possibly wrapping the task in something before
# pushing it to the wait queue.
function do_async_macro(expr; wrap=identity)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
Expand All @@ -414,14 +420,43 @@ macro async(expr)
let $(letargs...)
local task = Task($thunk)
if $(Expr(:islocal, var))
put!($var, task)
put!($var, $(wrap(:task)))
end
schedule(task)
task
end
end
end

# task wrapper that doesn't create exceptions wrapped in TaskFailedException
struct UnwrapTaskFailedException
task::Task
end

# common code for wait&fetch for UnwrapTaskFailedException
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
try
f(t.task)
catch ex
if ex isa TaskFailedException
throw(ex.task.exception)
else
rethrow()
end
end
end

# the unwrapping for above task wrapper (gets triggered in sync_end())
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)

# same for fetching the tasks, for convenience
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)

# macro for running async code that doesn't throw wrapped exceptions
macro async_unwrap(expr)
do_async_macro(expr, wrap=task->:(Base.UnwrapTaskFailedException($task)))
end

# Capture interpolated variables in $() and move them to let-block
function _lift_one_interp!(e)
letargs = Any[] # store the new gensymed arguments
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
hash, ==, kill, close, isopen, showerror

# imports for use
using Base: Process, Semaphore, JLOptions, buffer_writes, @sync_add,
using Base: Process, Semaphore, JLOptions, buffer_writes, @async_unwrap,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, shell_escape_wincmd, escape_microsoft_c_args,
Expand Down Expand Up @@ -75,7 +75,7 @@ function _require_callback(mod::Base.PkgId)
# broadcast top-level (e.g. from Main) import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
@sync_add remotecall(p) do
@async_unwrap remotecall_wait(p) do
Base.require(mod)
nothing
end
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared.
"""
function clear!(syms, pids=workers(); mod=Main)
@sync for p in pids
@sync_add remotecall(clear_impl!, p, syms, mod)
@async_unwrap remotecall_wait(clear_impl!, p, syms, mod)
end
end
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ function remotecall_eval(m::Module, procs, ex)
if pid == myid()
run_locally += 1
else
@sync_add remotecall(Core.eval, pid, m, ex)
@async_unwrap remotecall_wait(Core.eval, pid, m, ex)
end
end
yield() # ensure that the remotecall_fetch have had a chance to start
yield() # ensure that the remotecalls have had a chance to start

# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
Expand Down

0 comments on commit 1b60be5

Please sign in to comment.