Skip to content

Commit

Permalink
propagate errors on wait(::RemoteRef) and remotecall_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Oct 23, 2015
1 parent 8ada3cf commit a2e7cf4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
31 changes: 23 additions & 8 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,9 @@ function remotecall_wait(f, w::Worker, args...)
rv.waitingfor = w.id
rr = RemoteRef(w)
send_msg(w, CallWaitMsg(f, args, rr2id(rr), prid))
wait(rv)
v = fetch(rv.c)
delete!(PGRP.refs, prid)
isa(v, RemoteException) && throw(v)
rr
end

Expand Down Expand Up @@ -783,8 +784,18 @@ function call_on_owner(f, rr::RemoteRef, args...)
end
end

wait_ref(rid, args...) = (wait(lookup_ref(rid).c, args...); nothing)
wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, args...); r)
function wait_ref(rid, callee, args...)
v = fetch_ref(rid, args...)
if isa(v, RemoteException)
if myid() == callee
throw(v)
else
return v
end
end
nothing
end
wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, myid(), args...); r)

fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...)
Expand All @@ -796,19 +807,23 @@ put_ref(rid, args...) = put!(lookup_ref(rid), args...)
put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr)

take!(rv::RemoteValue, args...) = take!(rv.c, args...)
take_ref(rid, args...) = take!(lookup_ref(rid), args...)
take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, args...)
function take_ref(rid, callee, args...)
v=take!(lookup_ref(rid), args...)
isa(v, RemoteException) && (myid() == callee) && throw(v)
v
end
take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, myid(), args...)

close_ref(rid) = (close(lookup_ref(rid).c); nothing)
close(rr::RemoteRef) = call_on_owner(close_ref, rr)


function deliver_result(sock::IO, msg, oid, value)
#print("$(myid()) sending result $oid\n")
if is(msg,:call_fetch)
if is(msg,:call_fetch) || isa(value, RemoteException)
val = value
else
val = oid
val = :OK
end
try
send_msg_now(sock, ResultMsg(oid, val))
Expand Down Expand Up @@ -903,7 +918,7 @@ end
function handle_msg(msg::CallWaitMsg, r_stream, w_stream)
@schedule begin
rv = schedule_call(msg.response_oid, ()->msg.f(msg.args...))
deliver_result(w_stream, :call_wait, msg.notify_oid, wait(rv))
deliver_result(w_stream, :call_wait, msg.notify_oid, fetch(rv.c))
end
end

Expand Down
26 changes: 18 additions & 8 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,24 @@ catch ex
@test collect(1:5) == sort(map(x->parse(Int, x), errors))
end

try
remotecall_fetch(()->throw(ErrorException("foobar")), id_other)
error("unexpected")
catch ex
@test typeof(ex) == RemoteException
@test typeof(ex.captured) == CapturedException
@test typeof(ex.captured.ex) == ErrorException
@test ex.captured.ex.msg == "foobar"
macro test_remoteexception_thrown(expr)
quote
try
$(esc(expr))
error("unexpected")
catch ex
@test typeof(ex) == RemoteException
@test typeof(ex.captured) == CapturedException
@test typeof(ex.captured.ex) == ErrorException
@test ex.captured.ex.msg == "foobar"
end
end
end

for id in [id_other, id_me]
@test_remoteexception_thrown remotecall_fetch(()->throw(ErrorException("foobar")), id)
@test_remoteexception_thrown remotecall_wait(()->throw(ErrorException("foobar")), id)
@test_remoteexception_thrown wait(remotecall(()->throw(ErrorException("foobar")), id))
end

# The below block of tests are usually run only on local development systems, since:
Expand Down

0 comments on commit a2e7cf4

Please sign in to comment.