Skip to content

Commit

Permalink
ZMQ Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Nov 25, 2024
1 parent 5b3b1ec commit 5730312
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 192 deletions.
54 changes: 32 additions & 22 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1271,17 +1271,13 @@ function call_request(rb, msg)
result = unreactive(rb, raise=false, wait=msg.request.wait)
end
else
try
result = rpc(
rb,
msg.request.topic,
msg.request.data,
raise=false,
wait=msg.request.wait
)
catch e
@error "call_request: $e"
end
result = rpc(
rb,
msg.request.topic,
msg.request.data,
raise=false,
wait=msg.request.wait
)
end
reply(msg, result)
end
Expand Down Expand Up @@ -1656,7 +1652,7 @@ end
acks_file(c::RbURL) = joinpath(rembus_dir(), "$(c.id).acks")

#=
load_pubsub_received(rb::RBHandle)
load_pubsub_received(component::RbURL)
Load from file the ids of received Pub/Sub messages
awaiting Ack2 acknowledgements.
Expand Down Expand Up @@ -2240,21 +2236,17 @@ The `process` supervisor try to auto-reconnect if an exception occurs.
function connect(process::Visor.Supervised, rb::RBHandle)
_connect(rb, process)
authenticate(rb)
callbacks(rb, rb.receiver, rb.subinfo)
callbacks(rb)
return rb
end

function bind(process::Visor.Supervised, rb::RBServerConnection)
server = rb.router
callbacks(rb, server.topic_function, server.subinfo)
callbacks(rb)
return rb
end

#=
Notify all subscribed and exposed method to the remote node.
This happens just after a connection establishement.
=#
function callbacks(rb::RBHandle, fnmap, submap)
function _callbacks(rb::RBHandle, fnmap, submap)
for (name, fn) in fnmap
if haskey(submap, name)
subscribe_server(rb, name, from=submap[name])
Expand All @@ -2268,6 +2260,23 @@ function callbacks(rb::RBHandle, fnmap, submap)
end
end

#=
Notify all subscribed and exposed method to the remote node.
This happens just after a connection establishement.
=#
function callbacks(rb::RBServerConnection)
_callbacks(rb, rb.router.topic_function, rb.router.subinfo)
end

function callbacks(rb::RBConnection)
_callbacks(rb, rb.receiver, rb.subinfo)
end

function callbacks(twin::Twin)
# Acctually the broker does not declares to connecting nodes
# the list of exposed and subscribed methods.
end

function connect(rb::RBPool)
for c in rb.connections
try
Expand Down Expand Up @@ -2326,8 +2335,6 @@ function close_handle(rb)
return nothing
end

Base.close(rb::RBHandle) = close_handle(rb)

function Base.close(rb::RBServerConnection)
close_handle(rb)
filter!(rb.router.connections) do conn
Expand Down Expand Up @@ -3164,7 +3171,10 @@ function rpcreq(
wait=true,
broadcast=false
)
!isconnected(handle) && error("connection is down")
if !isconnected(handle)
do_request(nothing, msg, wait, timeout, raise)
end

if isa(handle, RBPool)
if broadcast
conn = handle.connections
Expand Down
213 changes: 48 additions & 165 deletions src/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ build_twin(router::Router, id, type) = Twin(router, id, type)

function build_twin(router::Server, id, type)
rb = RBServerConnection(router, id, type)
push!(router.connections, rb)
if type === zrouter
rb.socket = router.zmqsocket
end
Expand Down Expand Up @@ -639,7 +640,6 @@ function unregister(router, twin, msg)
save_token_app(router, router.component_owner)
end
return ResMsg(msg.id, sts, reason)
###put!(twin.process.inbox, response)
end

function rpc_response(router, twin, msg)
Expand Down Expand Up @@ -988,100 +988,23 @@ function commands_permitted(twin)
return true
end

function zeromq_receiver(router::Server)
pkt = ZMQPacket()
while true
try
zmq_message(router, pkt)
id = pkt.identity
if haskey(router.address2twin, id)
twin = router.address2twin[id]
else
@debug "creating anonymous twin from identity $id ($(bytes2zid(id)))"
# create the twin
twin = create_twin(string(bytes2zid(id)), router, zrouter)
@debug "[anonymous] client bound to twin id [$twin]"
router.address2twin[id] = twin
router.twin2address[ucid(twin)] = id
twin.zaddress = id
twin.socket = router.zmqsocket
end

msg::RembusMsg = broker_parse(pkt, false)
#@mlog("[ZMQ][$twin] <- $(prettystr(msg))")

if isa(msg, IdentityMsg)
@debug "[$twin] auth identity: $(msg.cid)"
# check if cid is registered
if key_file(router, msg.cid) !== nothing
# authentication mode, send the challenge
response = challenge(router, twin, msg.id)
#@mlog("[ZMQ][$twin] -> $response")
transport_send(Val(twin.type), twin, response)
else
identity_upgrade(router, twin, msg, id, authenticate=false)
end
server = twin.router
callbacks(twin, server.topic_function, server.subinfo)
elseif isa(msg, PingMsg)
if (twin.id != msg.cid)

# broker restarted
# start the authentication flow if cid is registered
@debug "lost connection to broker: restarting $(msg.cid)"
if key_file(router, msg.cid) !== nothing
# check if challenge was already sent
if !haskey(twin.session, "challenge")
response = challenge(router, twin, msg.id)
transport_send(Val(twin.type), twin, response)
end
else
identity_upgrade(router, twin, msg, id, authenticate=false)
end

else
if twin.socket !== nothing
pong(twin.socket, msg.id, id)
end
end
elseif isa(msg, Attestation)
identity_upgrade(router, twin, msg, id, authenticate=true)
elseif isa(msg, Register)
response = register(router, msg)
put!(twin.process.inbox, response)
elseif isa(msg, Close)
offline!(twin)
elseif isa(msg, AdminReqMsg)
admin_msg(router, twin, msg)
else
@async handle_input(twin, msg)
end
catch e
if isa(e, Visor.ProcessInterrupt) || isa(e, ZMQ.StateError)
rethrow()
end
@warn "[ZMQ] recv error: $e"
@showerror e
end
end
end

#=
Parse the message and invoke the actions related to the message type.
=#
function eval_message(twin::Twin, msg)
function eval_message(twin, msg, id=UInt8[])
router = twin.router
if isa(msg, IdentityMsg)
@debug "[$twin] auth identity: $(msg.cid)"
# check if cid is registered
if key_file(router, msg.cid) !== nothing
# authentication mode, send the challenge
response = challenge(router, twin, msg.id)
transport_send(Val(twin.type), twin, response)
else
return identity_upgrade(router, twin, msg, id, authenticate=false)
identity_upgrade(router, twin, msg, id, authenticate=false)
end
#@mlog("[ZMQ][$twin] -> $response")
transport_send(Val(twin.type), twin, response)
callbacks(twin)
elseif isa(msg, PingMsg)
if (twin.id != msg.cid)

Expand Down Expand Up @@ -1110,24 +1033,8 @@ function eval_message(twin::Twin, msg)
put!(twin.process.inbox, response)
elseif isa(msg, Close)
offline!(twin)
elseif commands_permitted(twin)
if isa(msg, Unregister)
response = unregister(router, twin, msg)
put!(twin.process.inbox, response)
elseif isa(msg, ResMsg)
rpc_response(router, twin, msg)
elseif isa(msg, AdminReqMsg)
admin_msg(router, twin, msg)
elseif isa(msg, RpcReqMsg)
rpc_request(router, twin, msg)
elseif isa(msg, PubSubMsg)
pubsub_msg(router, twin, msg)
elseif isa(msg, AckMsg)
ack_msg(twin, msg)
end
else
@info "[$twin]: [$msg] not authorized"
offline!(twin)
command(router, twin, msg)
end

return nothing
Expand Down Expand Up @@ -1156,10 +1063,43 @@ function zmq_receive(rb::Twin)
@debug "zmq socket closed"
end

function command(router::Router, twin, msg)
if commands_permitted(twin)
if isa(msg, Unregister)
response = unregister(router, twin, msg)
put!(twin.process.inbox, response)
elseif isa(msg, ResMsg)
rpc_response(router, twin, msg)
elseif isa(msg, AdminReqMsg)
admin_msg(router, twin, msg)
elseif isa(msg, RpcReqMsg)
rpc_request(router, twin, msg)
elseif isa(msg, PubSubMsg)
pubsub_msg(router, twin, msg)
elseif isa(msg, AckMsg)
ack_msg(twin, msg)
elseif isa(msg, Ack2Msg)
# the broker ignores Ack2 message
end
else
@info "[$twin]: [$msg] not authorized"
offline!(twin)
end
end


function command(router::Server, twin, msg)
if isa(msg, AdminReqMsg)
admin_msg(router, twin, msg)
else
@async handle_input(twin, msg)
end
end

#=
Broker zmq receiver.
Broker and Server zmq receiver.
=#
function zeromq_receiver(router::Router)
function zeromq_receiver(router)
pkt = ZMQPacket()
while true
try
Expand All @@ -1179,69 +1119,9 @@ function zeromq_receiver(router::Router)
twin.socket = router.zmqsocket
end

msg::RembusMsg = broker_parse(pkt)
#@mlog("[ZMQ][$twin] <- $(prettystr(msg))")
if isa(msg, IdentityMsg)
@debug "[$twin] auth identity: $(msg.cid)"
# check if cid is registered
if key_file(router, msg.cid) !== nothing
# authentication mode, send the challenge
response = challenge(router, twin, msg.id)
else
identity_upgrade(router, twin, msg, id, authenticate=false)
continue
end
#@mlog("[ZMQ][$twin] -> $response")
transport_send(Val(twin.type), twin, response)
elseif isa(msg, PingMsg)
if (twin.id != msg.cid)

# broker restarted
# start the authentication flow if cid is registered
@debug "lost connection to broker: restarting $(msg.cid)"
if key_file(router, msg.cid) !== nothing
# check if challenge was already sent
if !haskey(twin.session, "challenge")
response = challenge(router, twin, msg.id)
transport_send(Val(twin.type), twin, response)
end
else
identity_upgrade(router, twin, msg, id, authenticate=false)
end
msg::RembusMsg = broker_parse(pkt, isa(router, Router))

else
if twin.socket !== nothing
pong(twin.socket, msg.id, id)
end
end
elseif isa(msg, Attestation)
identity_upgrade(router, twin, msg, id, authenticate=true)
elseif isa(msg, Register)
response = register(router, msg)
put!(twin.process.inbox, response)
elseif isa(msg, Close)
offline!(twin)
elseif commands_permitted(twin)
if isa(msg, Unregister)
response = unregister(router, twin, msg)
put!(twin.process.inbox, response)
elseif isa(msg, ResMsg)
rpc_response(router, twin, msg)
elseif isa(msg, AdminReqMsg)
admin_msg(router, twin, msg)
elseif isa(msg, RpcReqMsg)
rpc_request(router, twin, msg)
elseif isa(msg, PubSubMsg)
pubsub_msg(router, twin, msg)
elseif isa(msg, AckMsg)
ack_msg(twin, msg)
elseif isa(msg, Ack2Msg)
# the broker ignores Ack2 message
end
else
@info "[$twin]: [$msg] not authorized"
offline!(twin)
end
eval_message(twin, msg, id)
catch e
if isa(e, Visor.ProcessInterrupt) || isa(e, ZMQ.StateError)
rethrow()
Expand Down Expand Up @@ -2867,8 +2747,11 @@ end
Add a server.
=#
function add_node(router, component)
connect(router, component)
push!(router.servers, cid(component))
url = cid(component)
if !(url in router.servers)
push!(router.servers, url)
connect(router, component)
end
end

add_node(router, url::AbstractString) = add_node(router, RbURL(url))
Expand Down
Loading

0 comments on commit 5730312

Please sign in to comment.