Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support direct get batch API #680

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/NatsHeaderParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ private bool TryParseHeaderLine(ReadOnlySpan<byte> headerLine, NatsHeaders heade
headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes;
headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageEobCode))
{
headers.Message = NatsHeaders.Messages.EobCode;
headers.MessageText = NatsHeaders.MessageEobCodeStr;
}
else
{
headers.Message = NatsHeaders.Messages.Text;
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/NatsHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum Messages
NoMessages,
RequestTimeout,
MessageSizeExceedsMaxBytes,
EobCode,
}

// Uses C# compiler's optimization for static byte[] data
Expand Down Expand Up @@ -56,6 +57,10 @@ public enum Messages
internal static ReadOnlySpan<byte> MessageMessageSizeExceedsMaxBytes => new byte[] { 77, 101, 115, 115, 97, 103, 101, 32, 83, 105, 122, 101, 32, 69, 120, 99, 101, 101, 100, 115, 32, 77, 97, 120, 66, 121, 116, 101, 115 };
internal static readonly string MessageMessageSizeExceedsMaxBytesStr = "Message Size Exceeds MaxBytes";

// EOB
internal static ReadOnlySpan<byte> MessageEobCode => new byte[] { 69, 79, 66 };
internal static readonly string MessageEobCodeStr = "EOB";

private static readonly string[] EmptyKeys = Array.Empty<string>();
private static readonly StringValues[] EmptyValues = Array.Empty<StringValues>();

Expand Down
10 changes: 10 additions & 0 deletions src/NATS.Client.JetStream/INatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,15 @@ ValueTask UpdateAsync(

ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Request a direct batch message
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default);

ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default);
}
78 changes: 78 additions & 0 deletions src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System.ComponentModel.DataAnnotations;
using System.Text.Json.Serialization;

namespace NATS.Client.JetStream.Models;

/// <summary>
/// A request to the JetStream $JS.API.STREAM.MSG.GET API
/// </summary>
public record StreamMsgBatchGetRequest
{
/// <summary>
/// The maximum amount of messages to be returned for this request
/// </summary>
[JsonPropertyName("batch")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(-1, int.MaxValue)]
public int Batch { get; set; }

/// <summary>
/// The maximum amount of returned bytes for this request.
/// </summary>
[JsonPropertyName("max_bytes")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(-1, int.MaxValue)]
public int MaxBytes { get; set; }

/// <summary>
/// The minimum sequence for returned message
/// </summary>
[JsonPropertyName("seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public ulong MinSequence { get; set; }

/// <summary>
/// The minimum start time for returned message
/// </summary>
[JsonPropertyName("start_time")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset StartTime { get; set; }

/// <summary>
/// The subject used filter messages that should be returned
/// </summary>
[JsonPropertyName("next_by_subj")]
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
[Required]
#if NET6_0
public string Subject { get; set; } = default!;
#else
#pragma warning disable SA1206
public required string Subject { get; set; }

#pragma warning restore SA1206
#endif

/// <summary>
/// Return last messages mathing the subjects
/// </summary>
[JsonPropertyName("multi_last")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string[] LastBySubjects { get; set; } = [];

/// <summary>
/// Return message after sequence
/// </summary>
[JsonPropertyName("up_to_seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public ulong UpToSequence { get; set; }

/// <summary>
/// Return message after time
/// </summary>
[JsonPropertyName("up_to_time")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset UpToTime { get; set; }
}
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static class NatsJSJsonSerializer<T>
[JsonSerializable(typeof(StreamMsgDeleteResponse))]
[JsonSerializable(typeof(StreamMsgGetRequest))]
[JsonSerializable(typeof(StreamMsgGetResponse))]
[JsonSerializable(typeof(StreamMsgBatchGetRequest))]
[JsonSerializable(typeof(StreamNamesRequest))]
[JsonSerializable(typeof(StreamNamesResponse))]
[JsonSerializable(typeof(StreamPurgeRequest))]
Expand Down
29 changes: 29 additions & 0 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,27 @@ public ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INat
cancellationToken: cancellationToken);
}

/// <summary>
/// Request a direct batch message
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default)
{
ValidateStream();

return _context.Connection.RequestManyAsync<StreamMsgBatchGetRequest, T>(
subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}",
data: request,
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default,
replySerializer: serializer,
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true },
cancellationToken: cancellationToken);
}

public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) =>
_context.JSRequestResponseAsync<StreamMsgGetRequest, StreamMsgGetResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}",
Expand All @@ -192,4 +213,12 @@ private void ThrowIfDeleted()
if (_deleted)
throw new NatsJSException($"Stream '{_name}' is deleted");
}

private void ValidateStream()
{
if (!Info.Config.AllowDirect)
{
throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable");
}
}
}
1 change: 1 addition & 0 deletions tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void ParserTests()
[InlineData("Request Timeout", NatsHeaders.Messages.RequestTimeout)]
[InlineData("Message Size Exceeds MaxBytes", NatsHeaders.Messages.MessageSizeExceedsMaxBytes)]
[InlineData("test message", NatsHeaders.Messages.Text)]
[InlineData("EOB", NatsHeaders.Messages.EobCode)]
public void ParserMessageEnumTests(string message, NatsHeaders.Messages result)
{
var parser = new NatsHeaderParser(Encoding.UTF8);
Expand Down
116 changes: 116 additions & 0 deletions tests/NATS.Client.JetStream.Tests/DirectGetTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using NATS.Client.Core.Tests;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Tests;

public class DirectGetTest
{
[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_when_stream_disable()
{
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("stream_disable", new[] { "stream_disable.x" });

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

async Task GetBatchAction()
{
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_disable.x" };
await foreach (var unused in stream.GetBatchDirectAsync<string>(streamMsgBatchGetRequest, cancellationToken: cancellationToken))
{
}
}

await Assert.ThrowsAsync<InvalidOperationException>(GetBatchAction);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_when_stream_enable()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("stream_enable", new[] { "stream_enable.x" }) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

for (var i = 0; i < 1; i++)
{
await js.PublishAsync("stream_enable.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
}

var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_enable.x", Batch = 3 };
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
{
testDataList.Add(msg.Data);
}

Assert.Single(testDataList);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_with_eobCode()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("eobCode", new[] { "eobCode.x" }) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

for (var i = 0; i < 1; i++)
{
await js.PublishAsync("eobCode.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
}

var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "eobCode.x", Batch = 3 };
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, includeEob: true, cancellationToken: cancellationToken))
{
testDataList.Add(msg.Data);
}

Assert.Equal(2, testDataList.Count);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_min_sequence()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("min_sequence", new[] { "min_sequence.x" }) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

for (var i = 0; i < 3; i++)
{
await js.PublishAsync("min_sequence.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
}

var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "min_sequence.x", Batch = 1, MinSequence = 3 };
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
{
testDataList.Add(msg.Data);
}

Assert.Single(testDataList);
Assert.Equal(2, testDataList[0]?.Test);
}
}