From d2cfbe6bfd0d8e32a32a774f83761e8092b4f972 Mon Sep 17 00:00:00 2001 From: Christopher Watford Date: Tue, 27 Aug 2019 18:44:57 -0400 Subject: [PATCH] Add Message.Respond implementing #281 (#283) * Add Msg.Respond - Adds Respond to Msg to simplify responses to Request-Reply - No longer clears Subscription.conn, until disposing - Add tests for Msg.Respond with both closed connections and lost servers - Throw NATSConnectionClosedException when appropriate --- NATS.Client/AsyncSub.cs | 24 ++++--- NATS.Client/Conn.cs | 6 +- NATS.Client/Msg.cs | 25 +++++++ NATS.Client/Subscription.cs | 33 +++++++-- NATSUnitTests/UnitTestSub.cs | 132 ++++++++++++++++++++++++++++++++++- 5 files changed, 201 insertions(+), 19 deletions(-) diff --git a/NATS.Client/AsyncSub.cs b/NATS.Client/AsyncSub.cs index db384010e..903bb141d 100644 --- a/NATS.Client/AsyncSub.cs +++ b/NATS.Client/AsyncSub.cs @@ -131,13 +131,16 @@ internal void enableAsyncProcessing() internal void disableAsyncProcessing() { - if (msgFeeder != null) + lock (mu) { - mch.close(); - msgFeeder = null; + if (msgFeeder != null) + { + mch.close(); + msgFeeder = null; + } + MessageHandler = null; + started = false; } - MessageHandler = null; - started = false; } /// @@ -155,11 +158,14 @@ public void Start() if (started) return; - if (conn == null) - throw new NATSBadSubscriptionException(); + lock (mu) + { + if (conn == null) + throw new NATSBadSubscriptionException(); - conn.sendSubscriptionMessage(this); - enableAsyncProcessing(); + conn.sendSubscriptionMessage(this); + enableAsyncProcessing(); + } } /// diff --git a/NATS.Client/Conn.cs b/NATS.Client/Conn.cs index 5c764297b..7c6c23b98 100644 --- a/NATS.Client/Conn.cs +++ b/NATS.Client/Conn.cs @@ -3439,8 +3439,9 @@ internal Task unsubscribe(Subscription sub, int max, bool drain, int timeout) if (isClosed()) throw new NATSConnectionClosedException(); - Subscription s = subs[sub.sid]; - if (s == null) + Subscription s; + if (!subs.TryGetValue(sub.sid, out s) + || s == null) { // already unsubscribed return null; @@ -3485,7 +3486,6 @@ internal virtual void removeSub(Subscription s) s.mch = null; } - s.conn = null; s.closed = true; } diff --git a/NATS.Client/Msg.cs b/NATS.Client/Msg.cs index 2e3980291..f61593ff0 100755 --- a/NATS.Client/Msg.cs +++ b/NATS.Client/Msg.cs @@ -167,6 +167,31 @@ public ISubscription ArrivalSubcription get { return sub; } } + /// + /// Send a response to the message on the arrival subscription. + /// + /// The response payload to send. + /// + /// is null or empty. + /// -or- + /// is null. + /// + public void Respond(byte[] data) + { + if (String.IsNullOrEmpty(Reply)) + { + throw new NATSException("No Reply subject"); + } + + Connection conn = ArrivalSubcription?.Connection; + if (conn == null) + { + throw new NATSException("Message is not bound to a subscription"); + } + + conn.Publish(this.Reply, data); + } + /// /// Generates a string representation of the messages. /// diff --git a/NATS.Client/Subscription.cs b/NATS.Client/Subscription.cs index 3467d87db..61346e428 100644 --- a/NATS.Client/Subscription.cs +++ b/NATS.Client/Subscription.cs @@ -222,7 +222,7 @@ public bool IsValid { lock (mu) { - return (conn != null); + return (conn != null) && !closed; } } } @@ -230,9 +230,11 @@ public bool IsValid internal void unsubscribe(bool throwEx) { Connection c; + bool isClosed; lock (mu) { c = this.conn; + isClosed = this.closed; } if (c == null) @@ -243,6 +245,18 @@ internal void unsubscribe(bool throwEx) return; } + if (c.IsClosed()) + { + if (throwEx) + throw new NATSConnectionClosedException(); + } + + if (isClosed) + { + if (throwEx) + throw new NATSBadSubscriptionException(); + } + if (c.IsDraining()) { if (throwEx) @@ -284,6 +298,12 @@ public virtual void AutoUnsubscribe(int max) if (conn == null) throw new NATSBadSubscriptionException(); + if (conn.IsClosed()) + throw new NATSConnectionClosedException(); + + if (closed) + throw new NATSBadSubscriptionException(); + c = conn; } @@ -301,7 +321,7 @@ public int QueuedMessageCount { lock (mu) { - if (conn == null) + if (conn == null || closed) throw new NATSBadSubscriptionException(); return mch.Count; @@ -332,6 +352,9 @@ protected virtual void Dispose(bool disposing) // auto unsubscribing, so ignore. } + conn = null; + closed = true; + disposedValue = true; } } @@ -370,9 +393,8 @@ public override string ToString() private void checkState() { - if (conn == null) + if (conn == null || closed) throw new NATSBadSubscriptionException(); - } /// @@ -573,7 +595,7 @@ internal Task InternalDrain(int timeout) lock (mu) { - if (conn == null) + if (conn == null || closed) throw new NATSBadSubscriptionException(); c = conn; @@ -595,7 +617,6 @@ public Task DrainAsync(int timeout) return InternalDrain(timeout); } - public void Drain() { Drain(Defaults.DefaultDrainTimeout); diff --git a/NATSUnitTests/UnitTestSub.cs b/NATSUnitTests/UnitTestSub.cs index 5a1a4e7d8..9f36160bd 100644 --- a/NATSUnitTests/UnitTestSub.cs +++ b/NATSUnitTests/UnitTestSub.cs @@ -532,7 +532,6 @@ public void TestAsyncSubscriptionPending() } } - [Fact] public void TestAsyncPendingSubscriptionBatchSizeExactlyOne() { @@ -1040,6 +1039,137 @@ public void TestInvalidSubjects() } } } + + [Fact] + public void TestRespond() + { + using (new NATSServer()) + { + using (IConnection c = utils.DefaultTestConnection) + using (ISyncSubscription s = c.SubscribeSync("foo")) + { + string replyTo = c.NewInbox(); + using (ISyncSubscription r = c.SubscribeSync(replyTo)) + { + c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); + + Msg m = s.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Reply); + + byte[] reply = Encoding.UTF8.GetBytes("reply"); + m.Respond(reply); + + m = r.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Subject); + Assert.Equal(reply, m.Data); + + s.Unsubscribe(); + r.Unsubscribe(); + } + } + } + } + + [Fact] + public void TestRespondWithAutoUnsubscribe() + { + using (new NATSServer()) + { + using (IConnection c = utils.DefaultTestConnection) + using (ISyncSubscription s = c.SubscribeSync("foo")) + { + s.AutoUnsubscribe(1); + + string replyTo = c.NewInbox(); + using (ISyncSubscription r = c.SubscribeSync(replyTo)) + { + c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); + + Msg m = s.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Reply); + + byte[] reply = Encoding.UTF8.GetBytes("reply"); + m.Respond(reply); + + m = r.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Subject); + Assert.Equal(reply, m.Data); + + r.Unsubscribe(); + } + } + } + } + + [Fact] + public void TestRespondFailsWithClosedConnection() + { + using (new NATSServer()) + { + using (IConnection c = utils.DefaultTestConnection) + { + ISyncSubscription s = c.SubscribeSync("foo"); + + string replyTo = c.NewInbox(); + c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); + + Msg m = s.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Reply); + + c.Close(); + + byte[] reply = Encoding.UTF8.GetBytes("reply"); + Assert.ThrowsAny(() => m.Respond(reply)); + + s.Dispose(); + } + } + } + + [Fact] + public void TestRespondFailsWithServerClosed() + { + IConnection c = null; + ISyncSubscription s = null; + try + { + Msg m; + using (NATSServer ns = new NATSServer()) + { + Options options = utils.DefaultTestOptions; + options.AllowReconnect = false; + + c = new ConnectionFactory().CreateConnection(options); + s = c.SubscribeSync("foo"); + + string replyTo = c.NewInbox(); + + c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message")); + + m = s.NextMessage(1000); + Assert.NotNull(m); + Assert.Equal(replyTo, m.Reply); + + ns.Shutdown(); + } + + // Give the server time to close + Thread.Sleep(2000); + + byte[] reply = Encoding.UTF8.GetBytes("reply"); + Assert.ThrowsAny(() => m.Respond(reply)); + } + finally + { + c?.Dispose(); + s?.Dispose(); + } + } } }