Skip to content

Commit

Permalink
add synchronization to Distributed to make sure setup msg is sent first
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson committed Oct 13, 2018
1 parent ee0c194 commit a80a2af
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
2 changes: 2 additions & 0 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mutable struct Worker
manager::ClusterManager
config::WorkerConfig
version::Union{VersionNumber, Nothing} # Julia version of the remote process
initialized::Threads.Event

function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager;
version::Union{VersionNumber, Nothing}=nothing,
Expand All @@ -90,6 +91,7 @@ mutable struct Worker
return map_pid_wrkr[id]
end
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
w.initialized = Threads.Event()
register_worker(w)
w
end
Expand Down
3 changes: 3 additions & 0 deletions stdlib/Distributed/src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ end

function send_msg_(w::Worker, header, msg, now::Bool)
check_worker_state(w)
if myid() != 1 && !isa(msg, IdentifySocketMsg) && !isa(msg, IdentifySocketAckMsg)
wait(w.initialized)
end
io = w.w_stream
lock(io.lock)
try
Expand Down
5 changes: 4 additions & 1 deletion stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,10 @@ end

function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
# register a new peer worker connection
w=Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
send_connection_hdr(w, false)
send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
notify(w.initialized)
end

function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version)
Expand All @@ -301,6 +302,7 @@ end
function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
LPROC.id = msg.self_pid
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
notify(controller.initialized)
register_worker(LPROC)
topology(msg.topology)

Expand Down Expand Up @@ -340,6 +342,7 @@ function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConf
process_messages(w.r_stream, w.w_stream, false)
send_connection_hdr(w, true)
send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))
notify(w.initialized)
catch e
@error "Error on $(myid()) while connecting to peer $rpid, exiting" exception=e,catch_backtrace()
exit(1)
Expand Down

0 comments on commit a80a2af

Please sign in to comment.