From fe8b1a8b45b54ce834637f9fc1303eaaf1458b02 Mon Sep 17 00:00:00 2001 From: David Widmann Date: Fri, 11 Oct 2024 11:47:24 +0200 Subject: [PATCH] Use `Threads.@spawn` instead of `@async` --- src/DistributedStdlibWorker.jl | 4 ++-- src/Malt.jl | 18 +++++++++--------- src/worker.jl | 4 ++-- test/basic.jl | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/DistributedStdlibWorker.jl b/src/DistributedStdlibWorker.jl index 2d57ab4..fad9e98 100644 --- a/src/DistributedStdlibWorker.jl +++ b/src/DistributedStdlibWorker.jl @@ -27,7 +27,7 @@ mutable struct DistributedStdlibWorker <: AbstractWorker # TODO: process preamble from Pluto? # There's no reason to keep the worker process alive after the manager loses its handle. - w = finalizer(w -> @async(stop(w)), + w = finalizer(w -> Threads.@spawn(stop(w)), new(pid, true) ) atexit(() -> stop(w)) @@ -52,7 +52,7 @@ macro transform_exception(worker, ex) end function remote_call(f, w::DistributedStdlibWorker, args...; kwargs...) - @async Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) + Threads.@spawn Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) end function remote_call_fetch(f, w::DistributedStdlibWorker, args...; kwargs...) diff --git a/src/Malt.jl b/src/Malt.jl index 043392e..8008774 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -123,7 +123,7 @@ mutable struct Worker <: AbstractWorker # There's no reason to keep the worker process alive after the manager loses its handle. - w = finalizer(w -> @async(stop(w)), + w = finalizer(w -> Threads.@spawn(stop(w)), new( port, proc, @@ -147,7 +147,7 @@ Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port) with function _exit_loop(worker::Worker) - @async for _i in Iterators.countfrom(1) + Threads.@spawn for _i in Iterators.countfrom(1) try if !isrunning(worker) # the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply. @@ -172,7 +172,7 @@ function _receive_loop(worker::Worker) # instead of # `while true` # as a workaround for https://github.com/JuliaLang/julia/issues/37154 - @async for _i in Iterators.countfrom(1) + Threads.@spawn for _i in Iterators.countfrom(1) try if !isopen(io) @debug("HOST: io closed.") @@ -283,7 +283,7 @@ _new_do_msg(f::Function, args, kwargs) = ( # # TODO: `while` instead of `if`? # if w.current_socket === nothing || !isopen(w.current_socket) # w.current_socket = connect(w.port) -# @async _receive_loop(w) +# Threads.@spawn _receive_loop(w) # end # return w # end @@ -337,12 +337,12 @@ function _send_receive(w::Worker, msg_type::UInt8, msg_data) end """ -`@async(_wait_for_response) ∘ _send_msg` +`Threads.@spawn(_wait_for_response) ∘ _send_msg` """ function _send_receive_async(w::Worker, msg_type::UInt8, msg_data, output_transformation=identity)::Task # TODO: Unwrap TaskFailedExceptions msg_id = _send_msg(w, msg_type, msg_data, true) - return @async output_transformation(_wait_for_response(w, msg_id)) + return Threads.@spawn output_transformation(_wait_for_response(w, msg_id)) end @@ -375,7 +375,7 @@ function remote_call(f, w::Worker, args...; kwargs...) ) end function remote_call(f, w::InProcessWorker, args...; kwargs...) - w.latest_request_task = @async remote_call_fetch(f, w, args...; kwargs...) + w.latest_request_task = Threads.@spawn remote_call_fetch(f, w, args...; kwargs...) end function remote_call_fetch(f, w::InProcessWorker, args...; kwargs...) try @@ -446,7 +446,7 @@ function remote_do(f, w::Worker, args...; kwargs...) nothing end function remote_do(f, ::InProcessWorker, args...; kwargs...) - @async f(args...; kwargs...) + Threads.@spawn f(args...; kwargs...) nothing end @@ -645,7 +645,7 @@ function _rethrow_to_repl(e::InterruptException; rethrow_regular::Bool=false) Base.active_repl_backend.in_eval @debug "HOST: Rethrowing interrupt to REPL" - @async Base.schedule(Base.active_repl_backend.backend_task, e; error=true) + Threads.@spawn Base.schedule(Base.active_repl_backend.backend_task, e; error=true) elseif rethrow_regular @debug "HOST: Don't know what to do with this interrupt, rethrowing" exception = (e, catch_backtrace()) rethrow(e) diff --git a/src/worker.jl b/src/worker.jl index 936564d..e600a8a 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -115,7 +115,7 @@ interrupt(::Nothing) = nothing function handle(::Val{MsgType.from_host_call_with_response}, socket, msg, msg_id::MsgID) f, args, kwargs, respond_with_nothing = msg - @async begin + Threads.@spawn begin result, success = try result = f(args...; kwargs...) @@ -139,7 +139,7 @@ end function handle(::Val{MsgType.from_host_call_without_response}, socket, msg, msg_id::MsgID) f, args, kwargs, _ignored = msg - @async try + Threads.@spawn try f(args...; kwargs...) catch e @warn("WORKER: Got exception while running call without response", exception=(e, catch_backtrace())) diff --git a/test/basic.jl b/test/basic.jl index 28b67fe..4017f11 100644 --- a/test/basic.jl +++ b/test/basic.jl @@ -128,7 +128,7 @@ - t = @async begin + t = Threads.@spawn begin for i in 1:2*channel_size @test take!(lc) == i end