Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Threads.@spawn instead of @async #86

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/DistributedStdlibWorker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mutable struct DistributedStdlibWorker <: AbstractWorker
# TODO: process preamble from Pluto?

# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)),
w = finalizer(w -> Threads.@spawn(stop(w)),
new(pid, true)
)
atexit(() -> stop(w))
Expand All @@ -52,7 +52,7 @@ macro transform_exception(worker, ex)
end

function remote_call(f, w::DistributedStdlibWorker, args...; kwargs...)
@async Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
Threads.@spawn Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
end

function remote_call_fetch(f, w::DistributedStdlibWorker, args...; kwargs...)
Expand Down
18 changes: 9 additions & 9 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mutable struct Worker <: AbstractWorker


# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)),
w = finalizer(w -> Threads.@spawn(stop(w)),
new(
port,
proc,
Expand All @@ -147,7 +147,7 @@ Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port) with


function _exit_loop(worker::Worker)
@async for _i in Iterators.countfrom(1)
Threads.@spawn for _i in Iterators.countfrom(1)
try
if !isrunning(worker)
# the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply.
Expand All @@ -172,7 +172,7 @@ function _receive_loop(worker::Worker)
# instead of
# `while true`
# as a workaround for https://github.com/JuliaLang/julia/issues/37154
@async for _i in Iterators.countfrom(1)
Threads.@spawn for _i in Iterators.countfrom(1)
try
if !isopen(io)
@debug("HOST: io closed.")
Expand Down Expand Up @@ -283,7 +283,7 @@ _new_do_msg(f::Function, args, kwargs) = (
# # TODO: `while` instead of `if`?
# if w.current_socket === nothing || !isopen(w.current_socket)
# w.current_socket = connect(w.port)
# @async _receive_loop(w)
# Threads.@spawn _receive_loop(w)
# end
# return w
# end
Expand Down Expand Up @@ -337,12 +337,12 @@ function _send_receive(w::Worker, msg_type::UInt8, msg_data)
end

"""
`@async(_wait_for_response) ∘ _send_msg`
`Threads.@spawn(_wait_for_response) ∘ _send_msg`
"""
function _send_receive_async(w::Worker, msg_type::UInt8, msg_data, output_transformation=identity)::Task
# TODO: Unwrap TaskFailedExceptions
msg_id = _send_msg(w, msg_type, msg_data, true)
return @async output_transformation(_wait_for_response(w, msg_id))
return Threads.@spawn output_transformation(_wait_for_response(w, msg_id))
end


Expand Down Expand Up @@ -375,7 +375,7 @@ function remote_call(f, w::Worker, args...; kwargs...)
)
end
function remote_call(f, w::InProcessWorker, args...; kwargs...)
w.latest_request_task = @async remote_call_fetch(f, w, args...; kwargs...)
w.latest_request_task = Threads.@spawn remote_call_fetch(f, w, args...; kwargs...)
end
function remote_call_fetch(f, w::InProcessWorker, args...; kwargs...)
try
Expand Down Expand Up @@ -446,7 +446,7 @@ function remote_do(f, w::Worker, args...; kwargs...)
nothing
end
function remote_do(f, ::InProcessWorker, args...; kwargs...)
@async f(args...; kwargs...)
Threads.@spawn f(args...; kwargs...)
nothing
end

Expand Down Expand Up @@ -645,7 +645,7 @@ function _rethrow_to_repl(e::InterruptException; rethrow_regular::Bool=false)
Base.active_repl_backend.in_eval

@debug "HOST: Rethrowing interrupt to REPL"
@async Base.schedule(Base.active_repl_backend.backend_task, e; error=true)
Threads.@spawn Base.schedule(Base.active_repl_backend.backend_task, e; error=true)
elseif rethrow_regular
@debug "HOST: Don't know what to do with this interrupt, rethrowing" exception = (e, catch_backtrace())
rethrow(e)
Expand Down
4 changes: 2 additions & 2 deletions src/worker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ interrupt(::Nothing) = nothing
function handle(::Val{MsgType.from_host_call_with_response}, socket, msg, msg_id::MsgID)
f, args, kwargs, respond_with_nothing = msg

@async begin
Threads.@spawn begin
result, success = try
result = f(args...; kwargs...)

Expand All @@ -139,7 +139,7 @@ end
function handle(::Val{MsgType.from_host_call_without_response}, socket, msg, msg_id::MsgID)
f, args, kwargs, _ignored = msg

@async try
Threads.@spawn try
f(args...; kwargs...)
catch e
@warn("WORKER: Got exception while running call without response", exception=(e, catch_backtrace()))
Expand Down
2 changes: 1 addition & 1 deletion test/basic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@



t = @async begin
t = Threads.@spawn begin
for i in 1:2*channel_size
@test take!(lc) == i
end
Expand Down
Loading