diff --git a/src/Rembus.jl b/src/Rembus.jl index b57d7c1..4f198aa 100644 --- a/src/Rembus.jl +++ b/src/Rembus.jl @@ -41,7 +41,6 @@ export @rpc_timeout export @forever export @shutdown -# rembus client api export component export connect export isauthenticated @@ -2422,6 +2421,9 @@ function close_handle(rb) save_pubsub_received(rb) end + for ackstate in values(rb.acktimer) + close(ackstate.timer) + end return nothing end diff --git a/src/transport.jl b/src/transport.jl index 92812a7..bd85f90 100644 --- a/src/transport.jl +++ b/src/transport.jl @@ -509,7 +509,13 @@ function transport_send( ), ACK_WAIT_TIME) ) pkt = [TYPE_PUB | msg.flags, id2bytes(msgid), msg.topic, msg.data] - broker_transport_write(twin.socket, pkt) + if twin.socket === nothing + # if the connection is closed stop trying to send pubsub messages + # that are not yet acknowledged. + put!(ack_cond, true) + else + broker_transport_write(twin.socket, pkt) + end outcome = fetch(ack_cond) delete!(twin.out, msgid) else @@ -550,7 +556,14 @@ function transport_send(::Val{socket}, rb::RBHandle, msg::PubSubMsg) ), ACK_WAIT_TIME) ) pkt = [TYPE_PUB | msg.flags, id2bytes(msgid), msg.topic, content] - transport_write(rb.socket, pkt) + if rb.socket === nothing + # if the connection is closed stop trying to send pubsub messages + # that are not yet acknowledged. + put!(ack_cond, true) + else + transport_write(rb.socket, pkt) + end + outcome = fetch(ack_cond) delete!(rb.out, msgid) else diff --git a/test/ack/test_client_ack_timeout.jl b/test/ack/test_client_ack_timeout.jl index d15797a..d1deddb 100644 --- a/test/ack/test_client_ack_timeout.jl +++ b/test/ack/test_client_ack_timeout.jl @@ -4,7 +4,6 @@ include("../utils.jl") function sub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.router.shared.msgid["sub_ack"] = msg.id return nothing @@ -15,7 +14,6 @@ end function sub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.PubSubMsg) rb.router.shared.msgid["sub_pubsub"] = msg.id elseif isa(msg, Rembus.Ack2Msg) @@ -26,7 +24,6 @@ end function pub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["pub_ack"] = msg.id end @@ -35,8 +32,6 @@ end function pub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" - if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["pub_pubsub"] = msg.id end @@ -81,7 +76,7 @@ end function run() ctx = Ctx() - topic = "qos2_topic" + topic = "client_ack_timeout_topic" rb = sub(topic, ctx) pub_rb = pub(topic, ctx) diff --git a/test/ack/test_qos0.jl b/test/ack/test_qos0.jl index aa87a9b..2aa7c18 100644 --- a/test/ack/test_qos0.jl +++ b/test/ack/test_qos0.jl @@ -2,7 +2,6 @@ include("../utils.jl") function sub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["sub_ack"] = msg.id end @@ -12,7 +11,6 @@ end function sub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["sub_pubsub"] = msg.id end @@ -21,7 +19,6 @@ end function pub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["pub_ack"] = msg.id end @@ -30,8 +27,6 @@ end function pub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" - if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["pub_pubsub"] = msg.id end diff --git a/test/ack/test_qos1.jl b/test/ack/test_qos1.jl index 2858c9b..f1881f6 100644 --- a/test/ack/test_qos1.jl +++ b/test/ack/test_qos1.jl @@ -2,7 +2,6 @@ include("../utils.jl") function sub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["sub_ack"] = msg.id end @@ -12,7 +11,6 @@ end function sub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["sub_pubsub"] = msg.id elseif isa(msg, Rembus.Ack2Msg) @@ -24,7 +22,6 @@ end function pub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["pub_ack"] = msg.id end @@ -33,8 +30,6 @@ end function pub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" - if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["pub_pubsub"] = msg.id end diff --git a/test/ack/test_qos2.jl b/test/ack/test_qos2.jl index 56d6f7e..f81d98e 100644 --- a/test/ack/test_qos2.jl +++ b/test/ack/test_qos2.jl @@ -2,7 +2,6 @@ include("../utils.jl") function sub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["sub_ack"] = msg.id end @@ -12,7 +11,6 @@ end function sub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["sub_pubsub"] = msg.id elseif isa(msg, Rembus.Ack2Msg) @@ -25,7 +23,6 @@ end function pub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["pub_ack"] = msg.id end @@ -34,8 +31,6 @@ end function pub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" - if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["pub_pubsub"] = msg.id end diff --git a/test/ack/test_server_ack_qos2.jl b/test/ack/test_server_ack_qos2.jl index f9d9c57..8f17fcc 100644 --- a/test/ack/test_server_ack_qos2.jl +++ b/test/ack/test_server_ack_qos2.jl @@ -4,7 +4,6 @@ include("../utils.jl") function sub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.router.shared.msgid["sub_ack"] = msg.id end @@ -14,7 +13,6 @@ end function sub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.PubSubMsg) rb.router.shared.msgid["sub_pubsub"] = msg.id elseif isa(msg, Rembus.Ack2Msg) @@ -25,7 +23,6 @@ end function pub_ingress(rb, msg) response = msg - @info "[$rb] ingress: $(msg.id) ($(typeof(msg)))" if isa(msg, Rembus.AckMsg) rb.shared.msgid["pub_ack"] = msg.id end @@ -34,8 +31,6 @@ end function pub_egress(rb, msg) response = msg - @info "[$rb] egress: $(msg.id) ($(typeof(msg)))" - if isa(msg, Rembus.PubSubMsg) rb.shared.msgid["pub_pubsub"] = msg.id end @@ -71,7 +66,7 @@ end function run() ctx = Ctx() - topic = "qos2_topic" + topic = "server_ack_qos2_topic" rb = sub(topic, ctx) pub_rb = pub(topic, ctx)