diff --git a/src/Rembus.jl b/src/Rembus.jl index b07fbb9..e1b05ee 100644 --- a/src/Rembus.jl +++ b/src/Rembus.jl @@ -1271,17 +1271,13 @@ function call_request(rb, msg) result = unreactive(rb, raise=false, wait=msg.request.wait) end else - try - result = rpc( - rb, - msg.request.topic, - msg.request.data, - raise=false, - wait=msg.request.wait - ) - catch e - @error "call_request: $e" - end + result = rpc( + rb, + msg.request.topic, + msg.request.data, + raise=false, + wait=msg.request.wait + ) end reply(msg, result) end @@ -1656,7 +1652,7 @@ end acks_file(c::RbURL) = joinpath(rembus_dir(), "$(c.id).acks") #= - load_pubsub_received(rb::RBHandle) + load_pubsub_received(component::RbURL) Load from file the ids of received Pub/Sub messages awaiting Ack2 acknowledgements. @@ -2240,21 +2236,17 @@ The `process` supervisor try to auto-reconnect if an exception occurs. function connect(process::Visor.Supervised, rb::RBHandle) _connect(rb, process) authenticate(rb) - callbacks(rb, rb.receiver, rb.subinfo) + callbacks(rb) return rb end function bind(process::Visor.Supervised, rb::RBServerConnection) server = rb.router - callbacks(rb, server.topic_function, server.subinfo) + callbacks(rb) return rb end -#= -Notify all subscribed and exposed method to the remote node. -This happens just after a connection establishement. -=# -function callbacks(rb::RBHandle, fnmap, submap) +function _callbacks(rb::RBHandle, fnmap, submap) for (name, fn) in fnmap if haskey(submap, name) subscribe_server(rb, name, from=submap[name]) @@ -2268,6 +2260,23 @@ function callbacks(rb::RBHandle, fnmap, submap) end end +#= +Notify all subscribed and exposed method to the remote node. +This happens just after a connection establishement. +=# +function callbacks(rb::RBServerConnection) + _callbacks(rb, rb.router.topic_function, rb.router.subinfo) +end + +function callbacks(rb::RBConnection) + _callbacks(rb, rb.receiver, rb.subinfo) +end + +function callbacks(twin::Twin) + # Acctually the broker does not declares to connecting nodes + # the list of exposed and subscribed methods. +end + function connect(rb::RBPool) for c in rb.connections try @@ -2326,8 +2335,6 @@ function close_handle(rb) return nothing end -Base.close(rb::RBHandle) = close_handle(rb) - function Base.close(rb::RBServerConnection) close_handle(rb) filter!(rb.router.connections) do conn @@ -3164,7 +3171,10 @@ function rpcreq( wait=true, broadcast=false ) - !isconnected(handle) && error("connection is down") + if !isconnected(handle) + do_request(nothing, msg, wait, timeout, raise) + end + if isa(handle, RBPool) if broadcast conn = handle.connections diff --git a/src/broker.jl b/src/broker.jl index 355d57d..576d868 100644 --- a/src/broker.jl +++ b/src/broker.jl @@ -341,6 +341,7 @@ build_twin(router::Router, id, type) = Twin(router, id, type) function build_twin(router::Server, id, type) rb = RBServerConnection(router, id, type) + push!(router.connections, rb) if type === zrouter rb.socket = router.zmqsocket end @@ -639,7 +640,6 @@ function unregister(router, twin, msg) save_token_app(router, router.component_owner) end return ResMsg(msg.id, sts, reason) - ###put!(twin.process.inbox, response) end function rpc_response(router, twin, msg) @@ -988,88 +988,10 @@ function commands_permitted(twin) return true end -function zeromq_receiver(router::Server) - pkt = ZMQPacket() - while true - try - zmq_message(router, pkt) - id = pkt.identity - if haskey(router.address2twin, id) - twin = router.address2twin[id] - else - @debug "creating anonymous twin from identity $id ($(bytes2zid(id)))" - # create the twin - twin = create_twin(string(bytes2zid(id)), router, zrouter) - @debug "[anonymous] client bound to twin id [$twin]" - router.address2twin[id] = twin - router.twin2address[ucid(twin)] = id - twin.zaddress = id - twin.socket = router.zmqsocket - end - - msg::RembusMsg = broker_parse(pkt, false) - #@mlog("[ZMQ][$twin] <- $(prettystr(msg))") - - if isa(msg, IdentityMsg) - @debug "[$twin] auth identity: $(msg.cid)" - # check if cid is registered - if key_file(router, msg.cid) !== nothing - # authentication mode, send the challenge - response = challenge(router, twin, msg.id) - #@mlog("[ZMQ][$twin] -> $response") - transport_send(Val(twin.type), twin, response) - else - identity_upgrade(router, twin, msg, id, authenticate=false) - end - server = twin.router - callbacks(twin, server.topic_function, server.subinfo) - elseif isa(msg, PingMsg) - if (twin.id != msg.cid) - - # broker restarted - # start the authentication flow if cid is registered - @debug "lost connection to broker: restarting $(msg.cid)" - if key_file(router, msg.cid) !== nothing - # check if challenge was already sent - if !haskey(twin.session, "challenge") - response = challenge(router, twin, msg.id) - transport_send(Val(twin.type), twin, response) - end - else - identity_upgrade(router, twin, msg, id, authenticate=false) - end - - else - if twin.socket !== nothing - pong(twin.socket, msg.id, id) - end - end - elseif isa(msg, Attestation) - identity_upgrade(router, twin, msg, id, authenticate=true) - elseif isa(msg, Register) - response = register(router, msg) - put!(twin.process.inbox, response) - elseif isa(msg, Close) - offline!(twin) - elseif isa(msg, AdminReqMsg) - admin_msg(router, twin, msg) - else - @async handle_input(twin, msg) - end - catch e - if isa(e, Visor.ProcessInterrupt) || isa(e, ZMQ.StateError) - rethrow() - end - @warn "[ZMQ] recv error: $e" - @showerror e - end - end -end - #= Parse the message and invoke the actions related to the message type. =# -function eval_message(twin::Twin, msg) +function eval_message(twin, msg, id=UInt8[]) router = twin.router if isa(msg, IdentityMsg) @debug "[$twin] auth identity: $(msg.cid)" @@ -1077,11 +999,12 @@ function eval_message(twin::Twin, msg) if key_file(router, msg.cid) !== nothing # authentication mode, send the challenge response = challenge(router, twin, msg.id) + transport_send(Val(twin.type), twin, response) else - return identity_upgrade(router, twin, msg, id, authenticate=false) + identity_upgrade(router, twin, msg, id, authenticate=false) end #@mlog("[ZMQ][$twin] -> $response") - transport_send(Val(twin.type), twin, response) + callbacks(twin) elseif isa(msg, PingMsg) if (twin.id != msg.cid) @@ -1110,24 +1033,8 @@ function eval_message(twin::Twin, msg) put!(twin.process.inbox, response) elseif isa(msg, Close) offline!(twin) - elseif commands_permitted(twin) - if isa(msg, Unregister) - response = unregister(router, twin, msg) - put!(twin.process.inbox, response) - elseif isa(msg, ResMsg) - rpc_response(router, twin, msg) - elseif isa(msg, AdminReqMsg) - admin_msg(router, twin, msg) - elseif isa(msg, RpcReqMsg) - rpc_request(router, twin, msg) - elseif isa(msg, PubSubMsg) - pubsub_msg(router, twin, msg) - elseif isa(msg, AckMsg) - ack_msg(twin, msg) - end else - @info "[$twin]: [$msg] not authorized" - offline!(twin) + command(router, twin, msg) end return nothing @@ -1156,10 +1063,43 @@ function zmq_receive(rb::Twin) @debug "zmq socket closed" end +function command(router::Router, twin, msg) + if commands_permitted(twin) + if isa(msg, Unregister) + response = unregister(router, twin, msg) + put!(twin.process.inbox, response) + elseif isa(msg, ResMsg) + rpc_response(router, twin, msg) + elseif isa(msg, AdminReqMsg) + admin_msg(router, twin, msg) + elseif isa(msg, RpcReqMsg) + rpc_request(router, twin, msg) + elseif isa(msg, PubSubMsg) + pubsub_msg(router, twin, msg) + elseif isa(msg, AckMsg) + ack_msg(twin, msg) + elseif isa(msg, Ack2Msg) + # the broker ignores Ack2 message + end + else + @info "[$twin]: [$msg] not authorized" + offline!(twin) + end +end + + +function command(router::Server, twin, msg) + if isa(msg, AdminReqMsg) + admin_msg(router, twin, msg) + else + @async handle_input(twin, msg) + end +end + #= -Broker zmq receiver. +Broker and Server zmq receiver. =# -function zeromq_receiver(router::Router) +function zeromq_receiver(router) pkt = ZMQPacket() while true try @@ -1179,69 +1119,9 @@ function zeromq_receiver(router::Router) twin.socket = router.zmqsocket end - msg::RembusMsg = broker_parse(pkt) - #@mlog("[ZMQ][$twin] <- $(prettystr(msg))") - if isa(msg, IdentityMsg) - @debug "[$twin] auth identity: $(msg.cid)" - # check if cid is registered - if key_file(router, msg.cid) !== nothing - # authentication mode, send the challenge - response = challenge(router, twin, msg.id) - else - identity_upgrade(router, twin, msg, id, authenticate=false) - continue - end - #@mlog("[ZMQ][$twin] -> $response") - transport_send(Val(twin.type), twin, response) - elseif isa(msg, PingMsg) - if (twin.id != msg.cid) - - # broker restarted - # start the authentication flow if cid is registered - @debug "lost connection to broker: restarting $(msg.cid)" - if key_file(router, msg.cid) !== nothing - # check if challenge was already sent - if !haskey(twin.session, "challenge") - response = challenge(router, twin, msg.id) - transport_send(Val(twin.type), twin, response) - end - else - identity_upgrade(router, twin, msg, id, authenticate=false) - end + msg::RembusMsg = broker_parse(pkt, isa(router, Router)) - else - if twin.socket !== nothing - pong(twin.socket, msg.id, id) - end - end - elseif isa(msg, Attestation) - identity_upgrade(router, twin, msg, id, authenticate=true) - elseif isa(msg, Register) - response = register(router, msg) - put!(twin.process.inbox, response) - elseif isa(msg, Close) - offline!(twin) - elseif commands_permitted(twin) - if isa(msg, Unregister) - response = unregister(router, twin, msg) - put!(twin.process.inbox, response) - elseif isa(msg, ResMsg) - rpc_response(router, twin, msg) - elseif isa(msg, AdminReqMsg) - admin_msg(router, twin, msg) - elseif isa(msg, RpcReqMsg) - rpc_request(router, twin, msg) - elseif isa(msg, PubSubMsg) - pubsub_msg(router, twin, msg) - elseif isa(msg, AckMsg) - ack_msg(twin, msg) - elseif isa(msg, Ack2Msg) - # the broker ignores Ack2 message - end - else - @info "[$twin]: [$msg] not authorized" - offline!(twin) - end + eval_message(twin, msg, id) catch e if isa(e, Visor.ProcessInterrupt) || isa(e, ZMQ.StateError) rethrow() @@ -2867,8 +2747,11 @@ end Add a server. =# function add_node(router, component) - connect(router, component) - push!(router.servers, cid(component)) + url = cid(component) + if !(url in router.servers) + push!(router.servers, url) + connect(router, component) + end end add_node(router, url::AbstractString) = add_node(router, RbURL(url)) diff --git a/src/transport.jl b/src/transport.jl index f332184..0d23e56 100644 --- a/src/transport.jl +++ b/src/transport.jl @@ -602,7 +602,7 @@ function transport_send(::Val{zdealer}, rb, msg::PubSubMsg) end function transport_send(::Val{zrouter}, twin, msg::PubSubMsg) - address = twin.router.twin2address[twin.id] + address = twin.zaddress data = data2message(msg.data) outcome = true if msg.flags > QOS0 @@ -676,7 +676,7 @@ function transport_send(::Val{socket}, rb::RBHandle, msg::RpcReqMsg) end function transport_send(::Val{zrouter}, rb, msg::RpcReqMsg) - address = rb.router.twin2address[rb.id] + address = rb.zaddress lock(zmqsocketlock) do send(rb.socket, address, more=true) send(rb.socket, Message(), more=true) diff --git a/test/ack/test_qos2.jl b/test/ack/test_qos2.jl index e4987a2..56d6f7e 100644 --- a/test/ack/test_qos2.jl +++ b/test/ack/test_qos2.jl @@ -18,7 +18,7 @@ function sub_ingress(rb, msg) elseif isa(msg, Rembus.Ack2Msg) rb.shared.msgid["sub_ack2"] = msg.id # to simulate an ACK2 message lost - #return nothing + return nothing end return response end @@ -88,7 +88,7 @@ function run() @info "messages awaiting ack2:\n$df" close(rb) close(pub_rb) - @test nrow(df) == 0 + @test nrow(df) == 1 end # cleanup files @@ -96,6 +96,9 @@ rm(joinpath(Rembus.rembus_dir(), "sub.acks"), force=true) execute(run, "test_qos2") +df = Rembus.load_pubsub_received(Rembus.RbURL("sub")) +@test !isempty(df) + if !Sys.iswindows() # expect one messages at rest df = Rembus.data_at_rest(string(1), BROKER_NAME) diff --git a/test/errors/test_zmq_invalid_message.jl b/test/errors/test_zmq_invalid_message.jl new file mode 100644 index 0000000..932d251 --- /dev/null +++ b/test/errors/test_zmq_invalid_message.jl @@ -0,0 +1,32 @@ +include("../utils.jl") +using ZMQ + +function wrong_message_type(address, socket) + lock(Rembus.zmqsocketlock) do + send(socket, address, more=true) + send(socket, Message(), more=true) + # send a wrong message id 0x17 + send(socket, encode([0x17, "topic"]), more=true) + send(socket, "data", more=true) + send(socket, Rembus.MESSAGE_END, more=false) + end +end + +function run() + srv = server(zmq=9001) + + bro = broker(wait=false, zmq=8002) + add_node(bro, "zmq://127.0.0.1:9001/server") + sleep(0.5) + conns = srv.connections + wrong_message_type(conns[1].zaddress, conns[1].socket) + @test length(bro.servers) == 1 +end + +@info "[test_zmq_invalid_message] start" +try + run() +finally + shutdown() +end +@info "[test_zmq_invalid_message] stop" diff --git a/test/multiplexer/test_rpc_future.jl b/test/multiplexer/test_rpc_future.jl index eb22d02..2ef7672 100644 --- a/test/multiplexer/test_rpc_future.jl +++ b/test/multiplexer/test_rpc_future.jl @@ -29,6 +29,17 @@ function run() unexpose(rb, component_service) close(rb) + + # no connections are available + rb = connect(["ws://:6000", "ws://:6001"]) + + @test_throws RembusError rpc(rb, "myservice") + + response = rpc(rb, "myservice", raise=false) + @test isa(response, RembusError) + + fut = rpc(rb, "myservice", wait=false) + @test_throws RembusError fetch_response(fut) end diff --git a/test/runtests.jl b/test/runtests.jl index b37417c..8532449 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -429,6 +429,9 @@ const GROUP = get(ENV, "GROUP", "all") @time @safetestset "serve_zmq_error" begin include("errors/test_serve_zmq_error.jl") end + @time @safetestset "zmq_invalid_message" begin + include("errors/test_zmq_invalid_message.jl") + end @time @safetestset "identity_empty_string" begin include("errors/test_identity_empty_string.jl") end diff --git a/test/tcp/test_tcp.jl b/test/tcp/test_tcp.jl index ab1ed88..e600ef5 100644 --- a/test/tcp/test_tcp.jl +++ b/test/tcp/test_tcp.jl @@ -27,7 +27,7 @@ function run() write(sock, UInt8[1, 2, 3]) sleep(1) # the wrong packet format close the connection - @test_throws ErrorException rpc(anonymous, "version") + @test_throws RembusError rpc(anonymous, "version") close(component1) end