From f8c8c9f53427f7258a2933899fa65ab9ade70d44 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Tue, 11 Feb 2020 19:04:43 -0700 Subject: [PATCH 1/6] fix flapping test Signed-off-by: Colin Sullivan --- src/Tests/IntegrationTests/TestReconnect.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Tests/IntegrationTests/TestReconnect.cs b/src/Tests/IntegrationTests/TestReconnect.cs index 9eda74b56..574246427 100644 --- a/src/Tests/IntegrationTests/TestReconnect.cs +++ b/src/Tests/IntegrationTests/TestReconnect.cs @@ -290,6 +290,7 @@ public void TestQueueSubsOnReconnect() { AutoResetEvent reconnectEvent = new AutoResetEvent(false); Options opts = getReconnectOptions(); + opts.MaxReconnect = 32; string subj = "foo.bar"; string qgroup = "workers"; From 2c4a34bf8c0524e7f7f34c42ab414b5dd5860340 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Tue, 11 Feb 2020 19:05:09 -0700 Subject: [PATCH 2/6] handle jetstream reply subjects Signed-off-by: Colin Sullivan --- src/NATS.Client/Conn.cs | 35 +++++++++++++++++---- src/Tests/IntegrationTests/TestBasic.cs | 42 ++++++++++++++++++------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Conn.cs index cd0249f4b..d57e6f48a 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Conn.cs @@ -2596,15 +2596,38 @@ public void Publish(string subject, string reply, byte[] data, int offset, int c private void requestResponseHandler(object sender, MsgHandlerEventArgs e) { + InFlightRequest request; + if (e.Message == null) return; - // \ - // \/ - // _INBOX.. - var requestId = e.Message.Subject.Substring(globalRequestInbox.Length + 1); - if (!waitingRequests.TryGetValue(requestId, out var request)) - return; + var subject = e.Message.Subject; + + // if it's a typical response, process normally. + if (subject.StartsWith(globalRequestInbox)) + { + // \ + // \/ + // _INBOX.. + var requestId = subject.Substring(globalRequestInbox.Length + 1); + if (!waitingRequests.TryGetValue(requestId, out request)) + return; + } + else + { + // We have a jetstream subject (remapped), so if there's only one + // request assume we're OK and handle it. + if (waitingRequests.Count == 1) + { + request = waitingRequests.ToArray()[0].Value; + } + else + { + // if we get here, we have multiple outsanding jetstream + // requests. We can't tell which is which we'll punt. + return; + } + } bool isClosed; diff --git a/src/Tests/IntegrationTests/TestBasic.cs b/src/Tests/IntegrationTests/TestBasic.cs index 2f436dc67..b57c3fdd2 100644 --- a/src/Tests/IntegrationTests/TestBasic.cs +++ b/src/Tests/IntegrationTests/TestBasic.cs @@ -517,7 +517,7 @@ public void TestRequestTimeout() { using (IConnection c = Context.OpenConnection()) { - Assert.Throws(() => c.Request("foo", null, 50)); + Assert.Throws(() => c.Request("foo", null, 50)); } Options opts = Context.GetTestOptions(); @@ -1415,7 +1415,7 @@ public void TestUrlArgument() c.Close(); } - using(var c = Context.ConnectionFactory.CreateConnection(url1)) + using (var c = Context.ConnectionFactory.CreateConnection(url1)) { c.Close(); } @@ -1543,7 +1543,7 @@ public void TestAsyncInfoProtocolPrune() opts.Url = $"nats://127.0.0.1:{Context.Server1.Port}"; AutoResetEvent evDS = new AutoResetEvent(false); - opts.ServerDiscoveredEventHandler = (o, a) =>{evDS.Set();}; + opts.ServerDiscoveredEventHandler = (o, a) => { evDS.Set(); }; AutoResetEvent evRC = new AutoResetEvent(false); opts.ReconnectedEventHandler = (o, a) => { evRC.Set(); }; @@ -1609,13 +1609,13 @@ private bool listsEqual(string[] l1, string[] l2) [Fact] public void TestServersRandomize() { - var serverList = new [] { + var serverList = new[] { Context.DefaultServer.Url, "nats://localhost:2", "nats://localhost:3", - "nats://localhost:4", - "nats://localhsot:5", - "nats://localhost:6", + "nats://localhost:4", + "nats://localhsot:5", + "nats://localhost:6", "nats://localhost:7" }; @@ -1681,26 +1681,26 @@ public void TestSimpleUrlArgument() // simple url o.Url = "127.0.0.1"; - using(Context.ConnectionFactory.CreateConnection(o)){} + using (Context.ConnectionFactory.CreateConnection(o)) { } // servers with a simple hostname o.Url = null; o.Servers = new string[] { "127.0.0.1" }; - using(var cn = Context.ConnectionFactory.CreateConnection(o)) + using (var cn = Context.ConnectionFactory.CreateConnection(o)) cn.Close(); // simple url connect - using(var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost")) + using (var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost")) cn.Close(); // url with multiple hosts o.Url = "127.0.0.1,localhost"; - using (Context.ConnectionFactory.CreateConnection(o)) {} + using (Context.ConnectionFactory.CreateConnection(o)) { } // servers with multiple hosts o.Url = null; o.Servers = new string[] { "127.0.0.1", "localhost" }; - using(var cn = Context.ConnectionFactory.CreateConnection(o)) + using (var cn = Context.ConnectionFactory.CreateConnection(o)) cn.Close(); } } @@ -1737,5 +1737,23 @@ public void TestServersOption() Assert.ThrowsAny(() => Context.ConnectionFactory.CreateConnection()); } + /// + /// Temporary test for upcoming jetstream functionality. The jetstream server must be run + /// manually and loaded per the issue listed below. JetStream will remap reply subjects + /// for requests, so additional code had to be added to handle this. Future jetstream tests + /// will cover this. + /// + [Fact(Skip = "Manual")] + public void TestJetstreamSubjectHandling() + { + // Start a NATS server with experimental -js feature and create a stream and consumer + // as described in issue https://github.com/nats-io/nats.net/issues/364 + + // publish a message into JS + using var c = new ConnectionFactory().CreateConnection(); + c.Publish("foo.inc", Encoding.UTF8.GetBytes("hello")); + c.Request("$JS.STREAM.foo.CONSUMER.bar.NEXT", Encoding.ASCII.GetBytes("1"), 1000); + } + } // class } // namespace From 7949142f676b3f87409b53222a2085c3753472c0 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Tue, 11 Feb 2020 19:05:09 -0700 Subject: [PATCH 3/6] handle jetstream reply subjects Signed-off-by: Colin Sullivan --- src/NATS.Client/Conn.cs | 35 +++++++++++++++++---- src/Tests/IntegrationTests/TestBasic.cs | 42 ++++++++++++++++++------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Conn.cs index cd0249f4b..d57e6f48a 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Conn.cs @@ -2596,15 +2596,38 @@ public void Publish(string subject, string reply, byte[] data, int offset, int c private void requestResponseHandler(object sender, MsgHandlerEventArgs e) { + InFlightRequest request; + if (e.Message == null) return; - // \ - // \/ - // _INBOX.. - var requestId = e.Message.Subject.Substring(globalRequestInbox.Length + 1); - if (!waitingRequests.TryGetValue(requestId, out var request)) - return; + var subject = e.Message.Subject; + + // if it's a typical response, process normally. + if (subject.StartsWith(globalRequestInbox)) + { + // \ + // \/ + // _INBOX.. + var requestId = subject.Substring(globalRequestInbox.Length + 1); + if (!waitingRequests.TryGetValue(requestId, out request)) + return; + } + else + { + // We have a jetstream subject (remapped), so if there's only one + // request assume we're OK and handle it. + if (waitingRequests.Count == 1) + { + request = waitingRequests.ToArray()[0].Value; + } + else + { + // if we get here, we have multiple outsanding jetstream + // requests. We can't tell which is which we'll punt. + return; + } + } bool isClosed; diff --git a/src/Tests/IntegrationTests/TestBasic.cs b/src/Tests/IntegrationTests/TestBasic.cs index 2f436dc67..b57c3fdd2 100644 --- a/src/Tests/IntegrationTests/TestBasic.cs +++ b/src/Tests/IntegrationTests/TestBasic.cs @@ -517,7 +517,7 @@ public void TestRequestTimeout() { using (IConnection c = Context.OpenConnection()) { - Assert.Throws(() => c.Request("foo", null, 50)); + Assert.Throws(() => c.Request("foo", null, 50)); } Options opts = Context.GetTestOptions(); @@ -1415,7 +1415,7 @@ public void TestUrlArgument() c.Close(); } - using(var c = Context.ConnectionFactory.CreateConnection(url1)) + using (var c = Context.ConnectionFactory.CreateConnection(url1)) { c.Close(); } @@ -1543,7 +1543,7 @@ public void TestAsyncInfoProtocolPrune() opts.Url = $"nats://127.0.0.1:{Context.Server1.Port}"; AutoResetEvent evDS = new AutoResetEvent(false); - opts.ServerDiscoveredEventHandler = (o, a) =>{evDS.Set();}; + opts.ServerDiscoveredEventHandler = (o, a) => { evDS.Set(); }; AutoResetEvent evRC = new AutoResetEvent(false); opts.ReconnectedEventHandler = (o, a) => { evRC.Set(); }; @@ -1609,13 +1609,13 @@ private bool listsEqual(string[] l1, string[] l2) [Fact] public void TestServersRandomize() { - var serverList = new [] { + var serverList = new[] { Context.DefaultServer.Url, "nats://localhost:2", "nats://localhost:3", - "nats://localhost:4", - "nats://localhsot:5", - "nats://localhost:6", + "nats://localhost:4", + "nats://localhsot:5", + "nats://localhost:6", "nats://localhost:7" }; @@ -1681,26 +1681,26 @@ public void TestSimpleUrlArgument() // simple url o.Url = "127.0.0.1"; - using(Context.ConnectionFactory.CreateConnection(o)){} + using (Context.ConnectionFactory.CreateConnection(o)) { } // servers with a simple hostname o.Url = null; o.Servers = new string[] { "127.0.0.1" }; - using(var cn = Context.ConnectionFactory.CreateConnection(o)) + using (var cn = Context.ConnectionFactory.CreateConnection(o)) cn.Close(); // simple url connect - using(var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost")) + using (var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost")) cn.Close(); // url with multiple hosts o.Url = "127.0.0.1,localhost"; - using (Context.ConnectionFactory.CreateConnection(o)) {} + using (Context.ConnectionFactory.CreateConnection(o)) { } // servers with multiple hosts o.Url = null; o.Servers = new string[] { "127.0.0.1", "localhost" }; - using(var cn = Context.ConnectionFactory.CreateConnection(o)) + using (var cn = Context.ConnectionFactory.CreateConnection(o)) cn.Close(); } } @@ -1737,5 +1737,23 @@ public void TestServersOption() Assert.ThrowsAny(() => Context.ConnectionFactory.CreateConnection()); } + /// + /// Temporary test for upcoming jetstream functionality. The jetstream server must be run + /// manually and loaded per the issue listed below. JetStream will remap reply subjects + /// for requests, so additional code had to be added to handle this. Future jetstream tests + /// will cover this. + /// + [Fact(Skip = "Manual")] + public void TestJetstreamSubjectHandling() + { + // Start a NATS server with experimental -js feature and create a stream and consumer + // as described in issue https://github.com/nats-io/nats.net/issues/364 + + // publish a message into JS + using var c = new ConnectionFactory().CreateConnection(); + c.Publish("foo.inc", Encoding.UTF8.GetBytes("hello")); + c.Request("$JS.STREAM.foo.CONSUMER.bar.NEXT", Encoding.ASCII.GetBytes("1"), 1000); + } + } // class } // namespace From a2033f185c8fd66f70cc205f8c6617128e2786a5 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Wed, 12 Feb 2020 09:27:45 -0700 Subject: [PATCH 4/6] lower language version features for CI --- src/Tests/IntegrationTests/TestBasic.cs | 36 ++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/src/Tests/IntegrationTests/TestBasic.cs b/src/Tests/IntegrationTests/TestBasic.cs index b57c3fdd2..ba5729d7b 100644 --- a/src/Tests/IntegrationTests/TestBasic.cs +++ b/src/Tests/IntegrationTests/TestBasic.cs @@ -1741,7 +1741,33 @@ public void TestServersOption() /// Temporary test for upcoming jetstream functionality. The jetstream server must be run /// manually and loaded per the issue listed below. JetStream will remap reply subjects /// for requests, so additional code had to be added to handle this. Future jetstream tests - /// will cover this. + /// will cover this and this test should be replaced. + /// + /// # Start server with jetstream enabled. + /// $ ./nats-server -js + /// + /// Create the stream + /// $ jsm str create + /// ? Stream Name foo + /// ? Subjects to consume foo.* + /// ? Storage backend memory + /// ? Retention Policy Limits + /// ? Message count limit -1 + /// ? Message size limit -1 + /// ? Maximum message age limit -1 + /// ? Maximum individual message size -1 + /// + /// Create the server side consumer + /// $ jsm con create + /// ? Select a Stream foo + /// ? Consumer name bar + /// ? Delivery target + /// ? Start policy (all, last, 1h, msg sequence) all + /// ? Filter Stream by subject (blank for all) + /// ? Maximum Allowed Deliveries -1 + /// + /// Now manually run this test. + /// /// [Fact(Skip = "Manual")] public void TestJetstreamSubjectHandling() @@ -1750,9 +1776,11 @@ public void TestJetstreamSubjectHandling() // as described in issue https://github.com/nats-io/nats.net/issues/364 // publish a message into JS - using var c = new ConnectionFactory().CreateConnection(); - c.Publish("foo.inc", Encoding.UTF8.GetBytes("hello")); - c.Request("$JS.STREAM.foo.CONSUMER.bar.NEXT", Encoding.ASCII.GetBytes("1"), 1000); + using (var c = new ConnectionFactory().CreateConnection()) + { + c.Publish("foo.inc", Encoding.UTF8.GetBytes("hello")); + c.Request("$JS.STREAM.foo.CONSUMER.bar.NEXT", Encoding.ASCII.GetBytes("1"), 1000); + } } } // class From 13e02cc68fae0b245690349d04f7ffeb4a8db58a Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Wed, 12 Feb 2020 09:28:18 -0700 Subject: [PATCH 5/6] lock request handling Signed-off-by: Colin Sullivan --- src/NATS.Client/Conn.cs | 53 ++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Conn.cs index d57e6f48a..d2b49b845 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Conn.cs @@ -2592,47 +2592,46 @@ public void Publish(string subject, string reply, byte[] data, int offset, int c protected Msg request(string subject, byte[] data, int offset, int count, int timeout) => requestSync(subject, data, offset, count, timeout); - private void removeOutstandingRequest(string requestId) => waitingRequests.TryRemove(requestId, out _); + private void RemoveOutstandingRequest(string requestId) => waitingRequests.TryRemove(requestId, out _); - private void requestResponseHandler(object sender, MsgHandlerEventArgs e) + private void RequestResponseHandler(object sender, MsgHandlerEventArgs e) { InFlightRequest request; + bool isClosed; if (e.Message == null) return; var subject = e.Message.Subject; - // if it's a typical response, process normally. - if (subject.StartsWith(globalRequestInbox)) - { - // \ - // \/ - // _INBOX.. - var requestId = subject.Substring(globalRequestInbox.Length + 1); - if (!waitingRequests.TryGetValue(requestId, out request)) - return; - } - else + lock (mu) { - // We have a jetstream subject (remapped), so if there's only one - // request assume we're OK and handle it. - if (waitingRequests.Count == 1) + // if it's a typical response, process normally. + if (subject.StartsWith(globalRequestInbox)) { - request = waitingRequests.ToArray()[0].Value; + // \ + // \/ + // _INBOX.. + var requestId = subject.Substring(globalRequestInbox.Length + 1); + if (!waitingRequests.TryGetValue(requestId, out request)) + return; } else { - // if we get here, we have multiple outsanding jetstream - // requests. We can't tell which is which we'll punt. - return; + // We have a jetstream subject (remapped), so if there's only one + // request assume we're OK and handle it. + if (waitingRequests.Count == 1) + { + request = waitingRequests.ToArray()[0].Value; + } + else + { + // if we get here, we have multiple outsanding jetstream + // requests. We can't tell which is which we'll punt. + return; + } } - } - bool isClosed; - - lock (mu) - { isClosed = this.isClosed(); } @@ -2653,7 +2652,7 @@ private InFlightRequest setupRequest(int timeout, CancellationToken token) if (requestId < 0) //Check if recycled requestId = (requestId + long.MaxValue + 1); - var request = new InFlightRequest(requestId.ToString(CultureInfo.InvariantCulture), token, timeout, removeOutstandingRequest); + var request = new InFlightRequest(requestId.ToString(CultureInfo.InvariantCulture), token, timeout, RemoveOutstandingRequest); request.Waiter.Task.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted); waitingRequests.TryAdd(request.Id, request); @@ -2664,7 +2663,7 @@ private InFlightRequest setupRequest(int timeout, CancellationToken token) { if (globalRequestSubscription == null) globalRequestSubscription = subscribeAsync(string.Concat(globalRequestInbox, ".*"), null, - requestResponseHandler); + RequestResponseHandler); } return request; From 4b2694be616c59c6066b36361193df461d30957b Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Wed, 12 Feb 2020 13:34:34 -0700 Subject: [PATCH 6/6] convert request ConcurrentDictionary to Dictionary Signed-off-by: Colin Sullivan --- src/NATS.Client/Conn.cs | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Conn.cs index d2b49b845..c5769d587 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Conn.cs @@ -155,8 +155,8 @@ public Options Opts // used to map replies to requests from client (should lock) private long nextRequestId = 0; - private readonly ConcurrentDictionary waitingRequests - = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary waitingRequests + = new Dictionary(StringComparer.OrdinalIgnoreCase); // Prepare protocol messages for efficiency private byte[] PING_P_BYTES = null; @@ -2592,7 +2592,13 @@ public void Publish(string subject, string reply, byte[] data, int offset, int c protected Msg request(string subject, byte[] data, int offset, int count, int timeout) => requestSync(subject, data, offset, count, timeout); - private void RemoveOutstandingRequest(string requestId) => waitingRequests.TryRemove(requestId, out _); + private void RemoveOutstandingRequest(string requestId) + { + lock (mu) + { + waitingRequests.Remove(requestId); + } + } private void RequestResponseHandler(object sender, MsgHandlerEventArgs e) { @@ -2622,7 +2628,10 @@ private void RequestResponseHandler(object sender, MsgHandlerEventArgs e) // request assume we're OK and handle it. if (waitingRequests.Count == 1) { - request = waitingRequests.ToArray()[0].Value; + InFlightRequest[] values = new InFlightRequest[1]; + waitingRequests.Values.CopyTo(values, 0); + request = values[0]; + } else { @@ -2654,13 +2663,16 @@ private InFlightRequest setupRequest(int timeout, CancellationToken token) var request = new InFlightRequest(requestId.ToString(CultureInfo.InvariantCulture), token, timeout, RemoveOutstandingRequest); request.Waiter.Task.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted); - waitingRequests.TryAdd(request.Id, request); - - if (globalRequestSubscription != null) - return request; lock (mu) { + // We shouldn't ever get an Argument exception because the ID is incrementing + // and since this is performant sensitive code, skipping an existence check. + waitingRequests.Add(request.Id, request); + + if (globalRequestSubscription != null) + return request; + if (globalRequestSubscription == null) globalRequestSubscription = subscribeAsync(string.Concat(globalRequestInbox, ".*"), null, RequestResponseHandler); @@ -3639,12 +3651,14 @@ private void clearPendingFlushCalls() // Caller must lock private void clearPendingRequestCalls() { - foreach (var request in waitingRequests) + lock (mu) { - request.Value.Waiter.TrySetCanceled(); + foreach (var request in waitingRequests) + { + request.Value.Waiter.TrySetCanceled(); + } + waitingRequests.Clear(); } - - waitingRequests.Clear(); }