Skip to content

Commit

Permalink
more threads?
Browse files Browse the repository at this point in the history
  • Loading branch information
vchuravy authored and Sacha0 committed Jun 22, 2021
1 parent 8f89925 commit 20a514f
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
t = @async exec_conn_func(w)
t = Threads.@spawn exec_conn_func(w)
else
# route request via node 1
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
Expand Down Expand Up @@ -258,7 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
else
sock = listen(interface, LPROC.bind_port)
end
errormonitor(@async while isopen(sock)
errormonitor(Threads.@spawn while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end)
Expand Down Expand Up @@ -290,7 +290,7 @@ end


function redirect_worker_output(ident, stream)
t = @async while !eof(stream)
t = Threads.@spawn while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -496,7 +496,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)

@sync begin
while true
if isempty(launched)
if isempty(launched) # XXX: Turn this into a Channel
istaskdone(t_launch) && break
@async (sleep(1); notify(launch_ntfy))
wait(launch_ntfy)
Expand All @@ -505,7 +505,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
@async setup_launched_worker(manager, wconfig, launched_q)
Threads.@spawn setup_launched_worker(manager, wconfig, launched_q)
end
end
end
Expand Down Expand Up @@ -584,7 +584,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
wconfig.port = port

let wconfig=wconfig
@async begin
Threads.@spawn begin
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
Expand Down Expand Up @@ -1238,7 +1238,7 @@ function interrupt(pids::AbstractVector=workers())
@assert myid() == 1
@sync begin
for pid in pids
@async interrupt(pid)
Threads.@spawn interrupt(pid)
end
end
end
Expand Down

0 comments on commit 20a514f

Please sign in to comment.