From ae90e378d3da90cdbf640edb08c0c4ed910ad036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Tue, 19 Nov 2024 22:01:48 +0500 Subject: [PATCH 1/6] #636 - JetStream Batch Get Client support * Added batch get contract --- src/NATS.Client.JetStream/INatsJSStream.cs | 10 +++ .../Models/StreamMsgBatchGetRequest.cs | 80 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs diff --git a/src/NATS.Client.JetStream/INatsJSStream.cs b/src/NATS.Client.JetStream/INatsJSStream.cs index 9fd3734df..7b041304d 100644 --- a/src/NATS.Client.JetStream/INatsJSStream.cs +++ b/src/NATS.Client.JetStream/INatsJSStream.cs @@ -112,5 +112,15 @@ ValueTask UpdateAsync( ValueTask> GetDirectAsync(StreamMsgGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// + /// Request a direct batch message + /// + /// Batch message request. + /// Serializer to use for the message type. + /// A used to cancel the API call. + /// + /// + IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs new file mode 100644 index 000000000..14d863ae3 --- /dev/null +++ b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs @@ -0,0 +1,80 @@ +using System.ComponentModel.DataAnnotations; +using System.Text.Json.Serialization; + +namespace NATS.Client.JetStream.Models; + +/// +/// A request to the JetStream $JS.API.STREAM.MSG.GET API +/// +public record StreamMsgBatchGetRequest +{ + /// + /// The maximum amount of messages to be returned for this request + /// + [JsonPropertyName("batch")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [Range(-1, int.MaxValue)] + public int Batch { get; set; } + + /// + /// The maximum amount of returned bytes for this request. + /// + [JsonPropertyName("max_bytes")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [Range(-1, int.MaxValue)] + public int MaxBytes { get; set; } + + /// + /// The minimum sequence for returned message + /// + [JsonPropertyName("seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [Range(ulong.MinValue, ulong.MaxValue)] + public ulong MinSequence { get; set; } + + /// + /// The minimum start time for returned message + /// + [JsonPropertyName("start_time")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [Range(ulong.MinValue, ulong.MaxValue)] + public DateTime StartTime { get; set; } + + /// + /// The subject used filter messages that should be returned + /// + [JsonPropertyName("next_by_subj")] + [JsonIgnore(Condition = JsonIgnoreCondition.Never)] + [Required] +#if NET6_0 + public string Subject { get; set; } = default!; +#else +#pragma warning disable SA1206 + public required string Subject { get; set; } + +#pragma warning restore SA1206 +#endif + + /// + /// Return last messages mathing the subjects + /// + [JsonPropertyName("multi_last")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [Range(ulong.MinValue, ulong.MaxValue)] + public string[] LastBySubjects { get; set; } = []; + + /// + /// Return message after sequence + /// + [JsonPropertyName("up_to_seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + [Range(ulong.MinValue, ulong.MaxValue)] + public ulong UpToSequence { get; set; } + + /// + /// Return message after time + /// + [JsonPropertyName("up_to_time")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTimeOffset UpToTime { get; set; } +} From 075f749e31d06a5bd41099a7bd9893493f0927dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sat, 23 Nov 2024 10:58:29 +0500 Subject: [PATCH 2/6] #636 - JetStream Batch Get Client support * Added implementation GetBatchDirectAsync --- src/NATS.Client.JetStream/INatsJSStream.cs | 6 ++--- src/NATS.Client.JetStream/NatsJSStream.cs | 29 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/NATS.Client.JetStream/INatsJSStream.cs b/src/NATS.Client.JetStream/INatsJSStream.cs index 7b041304d..840de8dd3 100644 --- a/src/NATS.Client.JetStream/INatsJSStream.cs +++ b/src/NATS.Client.JetStream/INatsJSStream.cs @@ -117,10 +117,10 @@ ValueTask UpdateAsync( /// /// Batch message request. /// Serializer to use for the message type. + /// true to send the last empty message with eobCode in the header; otherwise false /// A used to cancel the API call. - /// - /// - IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// There was an issue, stream must have allow direct set. + IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default); ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 54f8b63e0..50ab3da2f 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -181,6 +181,27 @@ public ValueTask> GetDirectAsync(StreamMsgGetRequest request, INat cancellationToken: cancellationToken); } + /// + /// Request a direct batch message + /// + /// Batch message request. + /// Serializer to use for the message type. + /// true to send the last empty message with eobCode in the header; otherwise false + /// A used to cancel the API call. + /// There was an issue, stream must have allow direct set. + public IAsyncEnumerable> GetBatchDirectAsync(StreamMsgBatchGetRequest request, INatsDeserialize? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default) + { + ValidateStream(); + + return _context.Connection.RequestManyAsync( + subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}", + data: request, + requestSerializer: NatsJSJsonSerializer.Default, + replySerializer: serializer, + replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true }, + cancellationToken: cancellationToken); + } + public ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) => _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}", @@ -192,4 +213,12 @@ private void ThrowIfDeleted() if (_deleted) throw new NatsJSException($"Stream '{_name}' is deleted"); } + + private void ValidateStream() + { + if (!Info.Config.AllowDirect) + { + throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable"); + } + } } From 95a495f412d6222d7e5c829bdbfb5b8da83b638a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sat, 23 Nov 2024 11:01:59 +0500 Subject: [PATCH 3/6] #636 - JetStream Batch Get Client support * Added eod parse in headers --- src/NATS.Client.Core/NatsHeaderParser.cs | 5 +++++ src/NATS.Client.Core/NatsHeaders.cs | 5 +++++ tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs | 1 + 3 files changed, 11 insertions(+) diff --git a/src/NATS.Client.Core/NatsHeaderParser.cs b/src/NATS.Client.Core/NatsHeaderParser.cs index e26077aa5..01f76ef48 100644 --- a/src/NATS.Client.Core/NatsHeaderParser.cs +++ b/src/NATS.Client.Core/NatsHeaderParser.cs @@ -119,6 +119,11 @@ private bool TryParseHeaderLine(ReadOnlySpan headerLine, NatsHeaders heade headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes; headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr; } + else if (headerLine.SequenceEqual(NatsHeaders.MessageEobCode)) + { + headers.Message = NatsHeaders.Messages.EobCode; + headers.MessageText = NatsHeaders.MessageEobCodeStr; + } else { headers.Message = NatsHeaders.Messages.Text; diff --git a/src/NATS.Client.Core/NatsHeaders.cs b/src/NATS.Client.Core/NatsHeaders.cs index f7bec3f67..0521d431a 100644 --- a/src/NATS.Client.Core/NatsHeaders.cs +++ b/src/NATS.Client.Core/NatsHeaders.cs @@ -25,6 +25,7 @@ public enum Messages NoMessages, RequestTimeout, MessageSizeExceedsMaxBytes, + EobCode, } // Uses C# compiler's optimization for static byte[] data @@ -56,6 +57,10 @@ public enum Messages internal static ReadOnlySpan MessageMessageSizeExceedsMaxBytes => new byte[] { 77, 101, 115, 115, 97, 103, 101, 32, 83, 105, 122, 101, 32, 69, 120, 99, 101, 101, 100, 115, 32, 77, 97, 120, 66, 121, 116, 101, 115 }; internal static readonly string MessageMessageSizeExceedsMaxBytesStr = "Message Size Exceeds MaxBytes"; + // EOB + internal static ReadOnlySpan MessageEobCode => new byte[] { 69, 79, 66 }; + internal static readonly string MessageEobCodeStr = "EOB"; + private static readonly string[] EmptyKeys = Array.Empty(); private static readonly StringValues[] EmptyValues = Array.Empty(); diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs b/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs index 240a0700e..9d12ffb4b 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs @@ -96,6 +96,7 @@ public void ParserTests() [InlineData("Request Timeout", NatsHeaders.Messages.RequestTimeout)] [InlineData("Message Size Exceeds MaxBytes", NatsHeaders.Messages.MessageSizeExceedsMaxBytes)] [InlineData("test message", NatsHeaders.Messages.Text)] + [InlineData("EOB", NatsHeaders.Messages.EobCode)] public void ParserMessageEnumTests(string message, NatsHeaders.Messages result) { var parser = new NatsHeaderParser(Encoding.UTF8); From c6338690511483ef989507e6a095c55074993b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sat, 23 Nov 2024 11:03:26 +0500 Subject: [PATCH 4/6] #636 - JetStream Batch Get Client support * Modify StreamMsgBatchGetRequest --- src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs | 4 +--- src/NATS.Client.JetStream/NatsJSJsonSerializer.cs | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs index 14d863ae3..7ad84a6cf 100644 --- a/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs +++ b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs @@ -37,8 +37,7 @@ public record StreamMsgBatchGetRequest /// [JsonPropertyName("start_time")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - [Range(ulong.MinValue, ulong.MaxValue)] - public DateTime StartTime { get; set; } + public DateTimeOffset StartTime { get; set; } /// /// The subject used filter messages that should be returned @@ -60,7 +59,6 @@ public record StreamMsgBatchGetRequest /// [JsonPropertyName("multi_last")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - [Range(ulong.MinValue, ulong.MaxValue)] public string[] LastBySubjects { get; set; } = []; /// diff --git a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs index 375be743f..4994ebb10 100644 --- a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs +++ b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs @@ -78,6 +78,7 @@ public static class NatsJSJsonSerializer [JsonSerializable(typeof(StreamMsgDeleteResponse))] [JsonSerializable(typeof(StreamMsgGetRequest))] [JsonSerializable(typeof(StreamMsgGetResponse))] +[JsonSerializable(typeof(StreamMsgBatchGetRequest))] [JsonSerializable(typeof(StreamNamesRequest))] [JsonSerializable(typeof(StreamNamesResponse))] [JsonSerializable(typeof(StreamPurgeRequest))] From 4110d1df72f526a1accd495de8291903b29c1910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sat, 23 Nov 2024 11:04:28 +0500 Subject: [PATCH 5/6] #636 - JetStream Batch Get Client support * Added tests for directGetBatch --- .../DirectGetTest.cs | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 tests/NATS.Client.JetStream.Tests/DirectGetTest.cs diff --git a/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs new file mode 100644 index 000000000..7cefb768e --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs @@ -0,0 +1,116 @@ +using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Tests; + +public class DirectGetTest +{ + [Fact] + public async Task Direct_get_when_stream_disable() + { + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("stream_disable", new[] { "stream_disable.x" }); + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + async Task GetBatchAction() + { + var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_disable.x" }; + await foreach (var unused in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, cancellationToken: cancellationToken)) + { + } + } + + await Assert.ThrowsAsync(GetBatchAction); + } + + [Fact] + public async Task Direct_get_when_stream_enable() + { + var testDataList = new List(); + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("stream_enable", new[] { "stream_enable.x" }) { AllowDirect = true }; + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + for (var i = 0; i < 1; i++) + { + await js.PublishAsync("stream_enable.x", new TestData { Test = i }, TestDataJsonSerializer.Default, cancellationToken: cancellationToken); + } + + var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_enable.x", Batch = 3 }; + await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer.Default, cancellationToken: cancellationToken)) + { + testDataList.Add(msg.Data); + } + + Assert.Single(testDataList); + } + + [Fact] + public async Task Direct_get_with_eobCode() + { + var testDataList = new List(); + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("eobCode", new[] { "eobCode.x" }) { AllowDirect = true }; + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + for (var i = 0; i < 1; i++) + { + await js.PublishAsync("eobCode.x", new TestData { Test = i }, TestDataJsonSerializer.Default, cancellationToken: cancellationToken); + } + + var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "eobCode.x", Batch = 3 }; + await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer.Default, includeEob: true, cancellationToken: cancellationToken)) + { + testDataList.Add(msg.Data); + } + + Assert.Equal(2, testDataList.Count); + } + + [Fact] + public async Task Direct_get_min_sequence() + { + var testDataList = new List(); + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); + var cancellationToken = cts.Token; + var streamConfig = new StreamConfig("min_sequence", new[] { "min_sequence.x" }) { AllowDirect = true }; + + var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); + + for (var i = 0; i < 3; i++) + { + await js.PublishAsync("min_sequence.x", new TestData { Test = i }, TestDataJsonSerializer.Default, cancellationToken: cancellationToken); + } + + var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "min_sequence.x", Batch = 1, MinSequence = 3 }; + await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer.Default, cancellationToken: cancellationToken)) + { + testDataList.Add(msg.Data); + } + + Assert.Single(testDataList); + Assert.Equal(2, testDataList[0]?.Test); + } +} From 8e31dda924e18f6ce649ac5eff379163d7a52c6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sat, 23 Nov 2024 11:34:50 +0500 Subject: [PATCH 6/6] #636 - JetStream Batch Get Client support * Changed tests, set SkipIfNatsServer * Applied dotnet format --- .../Models/StreamMsgBatchGetRequest.cs | 2 +- tests/NATS.Client.JetStream.Tests/DirectGetTest.cs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs index 7ad84a6cf..400cdb4ce 100644 --- a/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs +++ b/src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs @@ -1,4 +1,4 @@ -using System.ComponentModel.DataAnnotations; +using System.ComponentModel.DataAnnotations; using System.Text.Json.Serialization; namespace NATS.Client.JetStream.Models; diff --git a/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs index 7cefb768e..7d5c2860a 100644 --- a/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs +++ b/tests/NATS.Client.JetStream.Tests/DirectGetTest.cs @@ -1,11 +1,11 @@ -using NATS.Client.Core.Tests; +using NATS.Client.Core.Tests; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Tests; public class DirectGetTest { - [Fact] + [SkipIfNatsServer(versionEarlierThan: "2.11")] public async Task Direct_get_when_stream_disable() { await using var server = NatsServer.StartJS(); @@ -29,7 +29,7 @@ async Task GetBatchAction() await Assert.ThrowsAsync(GetBatchAction); } - [Fact] + [SkipIfNatsServer(versionEarlierThan: "2.11")] public async Task Direct_get_when_stream_enable() { var testDataList = new List(); @@ -57,7 +57,7 @@ public async Task Direct_get_when_stream_enable() Assert.Single(testDataList); } - [Fact] + [SkipIfNatsServer(versionEarlierThan: "2.11")] public async Task Direct_get_with_eobCode() { var testDataList = new List(); @@ -85,7 +85,7 @@ public async Task Direct_get_with_eobCode() Assert.Equal(2, testDataList.Count); } - [Fact] + [SkipIfNatsServer(versionEarlierThan: "2.11")] public async Task Direct_get_min_sequence() { var testDataList = new List();