From 5a64315941ede70352a1ac305be33263a0d33009 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Fri, 25 Jun 2021 16:12:05 +0200 Subject: [PATCH] make msg handling use `@spawn` --- stdlib/Distributed/src/cluster.jl | 4 ++-- stdlib/Distributed/src/process_messages.jl | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 3b3be7ed33f0d..7a0d936a10a9e 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -1053,13 +1053,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = @async _rmprocs(pids, typemax(Int)) + t = Threads.@spawn _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return @async nothing + return Threads.@spawn nothing end end diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 8d5dac5af571e..4a98584ccc7be 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -78,7 +78,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(@async run_work_thunk(rv, thunk)) + errormonitor(Threads.@spawn run_work_thunk(rv, thunk)) return rv end end @@ -111,7 +111,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) + errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -141,7 +141,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) + errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -274,7 +274,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - errormonitor(@async begin + errormonitor(Threads.@spawn begin v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try @@ -290,7 +290,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - errormonitor(@async begin + errormonitor(Threads.@spawn begin rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) nothing @@ -298,7 +298,7 @@ function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)) + errormonitor(Threads.@spawn run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)