Skip to content

Commit

Permalink
Implement Msg DateTime and Sequence only in JetStream
Browse files Browse the repository at this point in the history
  • Loading branch information
simonhoss committed Sep 30, 2023
1 parent 3c5abcc commit 76ff2bd
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 61 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ generated_code = true

[*.cs]
indent_size = 4
max_line_length = 300

# changes from VS2019 defaults
csharp_style_namespace_declarations = file_scoped
Expand Down
15 changes: 0 additions & 15 deletions src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs

This file was deleted.

29 changes: 2 additions & 27 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

Expand All @@ -25,8 +24,6 @@ namespace NATS.Client.Core;
public readonly record struct NatsMsg(
string Subject,
string? ReplyTo,
long? Seq,
DateTimeOffset? DateTime,
int Size,
NatsHeaders? Headers,
ReadOnlyMemory<byte> Data,
Expand All @@ -53,22 +50,12 @@ internal static NatsMsg Build(
headers.SetReadOnly();
}

long? seq = null;
DateTimeOffset? dateTime = null;

if (replyTo != null)
{
var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo);
seq = dateTimeAndSeq.Seq;
dateTime = dateTimeAndSeq.DateTime;
}

var size = subject.Length
+ (replyTo?.Length ?? 0)
+ (headersBuffer?.Length ?? 0)
+ payloadBuffer.Length;

return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, payloadBuffer.ToArray(), connection);
return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection);
}

/// <summary>
Expand Down Expand Up @@ -142,8 +129,6 @@ private void CheckReplyPreconditions()
public readonly record struct NatsMsg<T>(
string Subject,
string? ReplyTo,
long? Seq,
DateTimeOffset? DateTime,
int Size,
NatsHeaders? Headers,
T? Data,
Expand Down Expand Up @@ -178,22 +163,12 @@ internal static NatsMsg<T> Build(
headers.SetReadOnly();
}

long? seq = null;
DateTimeOffset? dateTime = null;

if (replyTo != null)
{
var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo);
seq = dateTimeAndSeq.Seq;
dateTime = dateTimeAndSeq.DateTime;
}

var size = subject.Length
+ (replyTo?.Length ?? 0)
+ (headersBuffer?.Length ?? 0)
+ payloadBuffer.Length;

return new NatsMsg<T>(subject, replyTo, seq, dateTime, (int) size, headers, data, connection);
return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
}

/// <summary>
Expand Down
28 changes: 28 additions & 0 deletions src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace NATS.Client.JetStream.Internal;

internal static class ReplyToDateTimeAndSeq
{
internal static (DateTimeOffset? DateTime, long? Seq) Parse(string? reply)
{
if (reply == null)
{
return (null, null);
}

var ackSeperated = reply.Split(".");

// It must be seperated by 9 dots as defined in
// https://docs.nats.io/reference/reference-protocols/nats_api_reference#acknowledging-messages
if (ackSeperated.Length != 9)
{
return (null, null);
}

var timestamp = long.Parse(ackSeperated[^2]);
var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000);
var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero);
var seq = long.Parse(ackSeperated[5]);

return (dateTime, seq);
}
}
11 changes: 5 additions & 6 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ public readonly struct NatsJSMsg<T>
{
private readonly NatsJSContext _context;
private readonly NatsMsg<T> _msg;
private readonly Lazy<(DateTimeOffset? DateTime, long? Sequence)> _replyToDateTimeAndSeq;

internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
{
_msg = msg;
_context = context;
_replyToDateTimeAndSeq = new Lazy<(DateTimeOffset? DateTime, long? Sequnce)>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo));
}

/// <summary>
Expand Down Expand Up @@ -53,12 +55,12 @@ internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
/// <summary>
/// The sequence number of the message.
/// </summary>
public long? Sequence => _msg.Seq;
public long? Sequence => _replyToDateTimeAndSeq.Value.Sequence;

/// <summary>
/// The time of the message.
/// </summary>
public DateTimeOffset? DateTime => _msg.DateTime;
public DateTimeOffset? DateTime => _replyToDateTimeAndSeq.Value.DateTime;

/// <summary>
/// Acknowledges the message was completely handled.
Expand Down Expand Up @@ -112,10 +114,7 @@ private ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opts = de

return _msg.ReplyAsync(
payload: payload,
opts: new NatsPubOpts
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,},

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 117 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

cancellationToken: cancellationToken);
}
}
Expand Down
13 changes: 0 additions & 13 deletions tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Tests.Internal;

public class ReplyToDateTimeAndSeqTest
{
[Fact]
public void ShouldParseReplyToDateTimeAndSeq()
{
var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0");

dateTime!.Value.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00");
seq.Should().Be(100);
}

[Fact]
public void ShouldSetNullForReturnWhenReplyToIsNull()
{
var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse(null);

dateTime.Should().BeNull();
seq.Should().BeNull();
}

[Fact]
public void ShouldSetNullWhenReplyToIsASimpleReqResponse()
{
var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("_INBOX.1");

dateTime.Should().BeNull();
seq.Should().BeNull();
}
}

0 comments on commit 76ff2bd

Please sign in to comment.