Skip to content

Commit

Permalink
make msg handling use @spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
vchuravy committed Jun 25, 2021
1 parent dd929c9 commit 5a64315
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1053,13 +1053,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = @async _rmprocs(pids, typemax(Int))
t = Threads.@spawn _rmprocs(pids, typemax(Int))
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return @async nothing
return Threads.@spawn nothing
end
end

Expand Down
12 changes: 6 additions & 6 deletions stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
errormonitor(@async run_work_thunk(rv, thunk))
errormonitor(Threads.@spawn run_work_thunk(rv, thunk))
return rv
end
end
Expand Down Expand Up @@ -111,7 +111,7 @@ end

## message event handlers ##
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming))
end

function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
Expand Down Expand Up @@ -141,7 +141,7 @@ Julia version number to perform the authentication handshake.
See also [`cluster_cookie`](@ref).
"""
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming))
end

function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
Expand Down Expand Up @@ -274,7 +274,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
end
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
errormonitor(@async begin
errormonitor(Threads.@spawn begin
v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false)
if isa(v, SyncTake)
try
Expand All @@ -290,15 +290,15 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
end

function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
errormonitor(@async begin
errormonitor(Threads.@spawn begin
rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
nothing
end)
end

function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true))
errormonitor(Threads.@spawn run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true))
end

function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
Expand Down

0 comments on commit 5a64315

Please sign in to comment.