From 3b7747e431a9f707af6e04758e824f72ba330acb Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 7 Jun 2023 17:04:25 -0400 Subject: [PATCH 1/2] Remove client side check against server info max_payload when publishing --- src/NATS.Client/{Conn.cs => Connection.cs} | 60 +------------------ src/NATS.Client/EncodedConn.cs | 18 +----- src/NATS.Client/Exceptions.cs | 5 +- src/Tests/IntegrationTests/TestBasic.cs | 26 +++++++- src/Tests/IntegrationTests/TestJetStream.cs | 66 ++++++++++++++++++++- 5 files changed, 95 insertions(+), 80 deletions(-) rename src/NATS.Client/{Conn.cs => Connection.cs} (97%) diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Connection.cs similarity index 97% rename from src/NATS.Client/Conn.cs rename to src/NATS.Client/Connection.cs index dc311a29d..b9f06357f 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Connection.cs @@ -99,7 +99,7 @@ internal enum ClientProtcolVersion // encoded conn rather than using this class as // a base class. This can happen anytime as we are using // interfaces. - public class Connection : IConnection, IDisposable + public class Connection : IConnection { Statistics stats = new Statistics(); @@ -2597,10 +2597,6 @@ internal void PublishImpl(string subject, string reply, MsgHeader inHeaders, byt if (isDrainingPubs()) throw new NATSConnectionDrainingException(); - // Proactively reject payloads over the threshold set by server. - if (count > info.MaxPayload) - throw new NATSMaxPayloadException(); - if (lastEx != null) throw lastEx; @@ -2680,8 +2676,6 @@ internal void PublishImpl(string subject, string reply, MsgHeader inHeaders, byt /// to the connected NATS server. /// is /// null or entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// There was an unexpected exception performing an internal NATS call /// while publishing. See for more details. @@ -2701,8 +2695,6 @@ public void Publish(string subject, byte[] data) /// to the connected NATS server. /// is /// null or entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// There was an unexpected exception performing an internal NATS call /// while publishing. See for more details. @@ -2755,8 +2747,6 @@ public void Publish(string subject, MsgHeader headers, byte[] data, int offset, /// to the connected NATS server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// There was an unexpected exception performing an internal NATS call /// while publishing. See for more details. @@ -2777,8 +2767,6 @@ public void Publish(string subject, string reply, byte[] data) /// to the connected NATS server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// There was an unexpected exception performing an internal NATS call /// while publishing. See for more details. @@ -2832,8 +2820,6 @@ public void Publish(string subject, string reply, MsgHeader headers, byte[] data /// is null. /// The property of /// is null or entirely whitespace. - /// The property of - /// exceeds the maximum payload size supported by the NATS server. /// The is closed. /// There was an unexpected exception performing an internal NATS call /// while publishing. See for more details. @@ -3050,8 +3036,6 @@ private Msg OldRequestImpl(string subject, MsgHeader headers, byte[] data, int o /// (0). /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3085,8 +3069,6 @@ public Msg Request(string subject, byte[] data, int timeout) /// (0). /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3166,8 +3148,6 @@ public Msg Request(string subject, MsgHeader headers, byte[] data, int offset, i /// A with the response from the NATS server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3197,8 +3177,6 @@ public Msg Request(string subject, byte[] data) /// A with the response from the NATS server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3229,16 +3207,12 @@ public Msg Request(string subject, MsgHeader headers, byte[] data) /// the current connection. /// An array of type that contains the request data to publish /// to the connected NATS server. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The zero-based byte offset in at which to begin publishing /// bytes to the subject. /// The number of bytes to be published to the subject. /// A with the response from the NATS server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3270,16 +3244,12 @@ public Msg Request(string subject, byte[] data, int offset, int count) /// Optional headers to publish with the message. /// An array of type that contains the request data to publish /// to the connected NATS server. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The zero-based byte offset in at which to begin publishing /// bytes to the subject. /// The number of bytes to be published to the subject. /// A with the response from the NATS server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3311,8 +3281,6 @@ public Msg Request(string subject, MsgHeader headers, byte[] data, int offset, i /// is null. /// The subject is null /// or entirely whitespace. - /// payload exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3344,8 +3312,6 @@ public Msg Request(Msg message, int timeout) /// is null. /// The subject is null /// or entirely whitespace. - /// payload exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3462,8 +3428,6 @@ private Task OldRequestAsyncImpl(string subject, MsgHeader headers, byte[] /// (0). /// is null /// or entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3500,8 +3464,6 @@ public Task RequestAsync(string subject, byte[] data, int timeout) /// (0). /// is null /// or entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3587,8 +3549,6 @@ public Task RequestAsync(string subject, MsgHeader headers, byte[] data, in /// server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3622,8 +3582,6 @@ public Task RequestAsync(string subject, byte[] data) /// server. /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3713,8 +3671,6 @@ public Task RequestAsync(string subject, MsgHeader headers, byte[] data, in /// (0). /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3754,8 +3710,6 @@ public Task RequestAsync(string subject, byte[] data, int timeout, Cancella /// (0). /// is null or /// entirely whitespace. - /// exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3790,8 +3744,6 @@ public Task RequestAsync(string subject, MsgHeader headers, byte[] data, in /// server. /// is null /// or entirely whitespace. - /// exceeds the maximum payload size supported - /// by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3827,8 +3779,6 @@ public Task RequestAsync(string subject, byte[] data, CancellationToken tok /// server. /// is null /// or entirely whitespace. - /// exceeds the maximum payload size supported - /// by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3918,8 +3868,6 @@ public Task RequestAsync(string subject, MsgHeader headers, byte[] data, in /// is null. /// The subject is null /// or entirely whitespace. - /// payload exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3955,8 +3903,6 @@ public Task RequestAsync(Msg message, int timeout) /// is null. /// The subject is null /// or entirely whitespace. - /// payload exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -3998,8 +3944,6 @@ public Task RequestAsync(Msg message) /// is null. /// The subject is null /// or entirely whitespace. - /// payload exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -4037,8 +3981,6 @@ public Task RequestAsync(Msg message, int timeout, CancellationToken token) /// is null. /// The subject is null /// or entirely whitespace. - /// payload exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. diff --git a/src/NATS.Client/EncodedConn.cs b/src/NATS.Client/EncodedConn.cs index 036ee478e..575784ef3 100644 --- a/src/NATS.Client/EncodedConn.cs +++ b/src/NATS.Client/EncodedConn.cs @@ -180,8 +180,6 @@ private void PublishObjectImpl(string subject, string reply, MsgHeader headers, /// The to serialize and publish to the connected NATS server. /// is /// null or entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// is null. /// -or- @@ -202,8 +200,6 @@ public void Publish(string subject, Object obj) /// The to serialize and publish to the connected NATS server. /// is /// null or entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// is null. /// -or- @@ -224,8 +220,6 @@ public void Publish(string subject, MsgHeader headers, Object obj) /// The to serialize and publish to the connected NATS server. /// is null or /// entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// is null. /// -or- @@ -247,8 +241,6 @@ public void Publish(string subject, string reply, object obj) /// The to serialize and publish to the connected NATS server. /// is null or /// entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// is null. /// -or- @@ -400,8 +392,6 @@ private object RequestObjectImpl(string subject, MsgHeader headers, object obj, /// (0). /// is null or /// entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -433,8 +423,6 @@ public object Request(string subject, object obj, int timeout) /// (0). /// is null or /// entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -461,8 +449,6 @@ public object Request(string subject, MsgHeader headers, object obj, int timeout /// A with the deserialized response from the NATS server. /// is null or /// entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -490,8 +476,6 @@ public object Request(string subject, object obj) /// A with the deserialized response from the NATS server. /// is null or /// entirely whitespace. - /// The serialized form of exceeds the maximum payload size - /// supported by the NATS server. /// The is closed. /// A timeout occurred while sending the request or receiving the /// response. @@ -572,4 +556,4 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } } -} \ No newline at end of file +} diff --git a/src/NATS.Client/Exceptions.cs b/src/NATS.Client/Exceptions.cs index 17a4e061e..b3f04c5a4 100644 --- a/src/NATS.Client/Exceptions.cs +++ b/src/NATS.Client/Exceptions.cs @@ -119,6 +119,7 @@ public NATSStaleConnectionException() : base("Connection is stale.") { } /// The exception that is thrown when a message payload exceeds what /// the maximum configured. /// + [Obsolete("This exception is no longer thrown.", false)] public class NATSMaxPayloadException : NATSException { public NATSMaxPayloadException() : base("Maximum payload size has been exceeded") { } @@ -197,8 +198,8 @@ public class NATSNoRespondersException : NATSTimeoutException /// public class NATSJetStreamStatusException : NATSException { - private Subscription Sub { get; } - private MsgStatus Status { get; } + public Subscription Sub { get; } + public MsgStatus Status { get; } public NATSJetStreamStatusException(MsgStatus status, Subscription sub = null) : base($"{(status == null ? "Unknown or unprocessed status message" : status.Message)}") diff --git a/src/Tests/IntegrationTests/TestBasic.cs b/src/Tests/IntegrationTests/TestBasic.cs index 0c44a1c14..ab95c4afa 100644 --- a/src/Tests/IntegrationTests/TestBasic.cs +++ b/src/Tests/IntegrationTests/TestBasic.cs @@ -2082,7 +2082,31 @@ public void TestMessageHeaderNoServerSupport() c.Publish(m); } } - + + [Fact] + public void TestMaxPayload() + { + Assert.Throws(() => + { + using (NATSServer.CreateFast()) + { + var opts = Context.GetTestOptions(); + opts.AllowReconnect = false; + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + long size = 0; + long maxPayload = c.ServerInfo.MaxPayload; + for (int x = -1; x < 10; x++) + { + size = maxPayload + x; + c.Publish("mp", new byte[size]); + Thread.Sleep(100); + } + } + } + }); + } + } // class } // namespace diff --git a/src/Tests/IntegrationTests/TestJetStream.cs b/src/Tests/IntegrationTests/TestJetStream.cs index 62a150a25..998849e58 100644 --- a/src/Tests/IntegrationTests/TestJetStream.cs +++ b/src/Tests/IntegrationTests/TestJetStream.cs @@ -937,6 +937,70 @@ public void TestMoreCreateSubscriptionErrors() Assert.Contains(JsSubFcHbNotValidQueue.Id, e.Message); }); } - + + [Fact] + public void TestMaxPayloadJs() + { + string streamName = "stream-max-payload-test"; + string subject = "mptest"; + + Options opts = ConnectionFactory.GetDefaultOptions(); + opts.AllowReconnect = false; + NATSServer.QuietOptionsModifier.Invoke(opts); + + using (NATSServer.CreateJetStreamFast()) + { + Assert.Throws(() => + { + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); + try + { + jsm.DeleteStream(streamName); + } + catch (NATSJetStreamException) + { + } + + jsm.AddStream(StreamConfiguration.Builder() + .WithName(streamName) + .WithStorageType(StorageType.Memory) + .WithSubjects(subject) + .WithMaxMsgSize(1000) + .Build() + ); + + IJetStream js = c.CreateJetStreamContext(); + long size = 0; + for (int x = -1; x < 10; x++) + { + size = 1000 + x; + js.Publish("mptest", new byte[size]); + Thread.Sleep(100); + } + } + }); + + bool receivedDisconnect = false; + opts.DisconnectedEventHandler = (sender, args) => + { + receivedDisconnect = true; + }; + + using (var c = Context.ConnectionFactory.CreateConnection(opts)) + { + IJetStream js = c.CreateJetStreamContext(); + long size = 0; + for (int x = -1; x < 10; x++) + { + size = 1000 + x; + js.PublishAsync("mptest", new byte[size]); + Thread.Sleep(100); + } + } + Assert.True(receivedDisconnect); + } + } } } From 64a26bb990008f8887d15a377f204bd36a6e4b91 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 7 Jun 2023 17:10:39 -0400 Subject: [PATCH 2/2] changing this is a different pr --- src/NATS.Client/Exceptions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client/Exceptions.cs b/src/NATS.Client/Exceptions.cs index b3f04c5a4..91b70e24a 100644 --- a/src/NATS.Client/Exceptions.cs +++ b/src/NATS.Client/Exceptions.cs @@ -198,8 +198,8 @@ public class NATSNoRespondersException : NATSTimeoutException /// public class NATSJetStreamStatusException : NATSException { - public Subscription Sub { get; } - public MsgStatus Status { get; } + private Subscription Sub { get; } + private MsgStatus Status { get; } public NATSJetStreamStatusException(MsgStatus status, Subscription sub = null) : base($"{(status == null ? "Unknown or unprocessed status message" : status.Message)}")