From ffc5f5ee74b196e884677a0350acc8e782d5bfe1 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 18 Oct 2023 14:22:57 -0400 Subject: [PATCH] Direct Message Subject Header May Contain Multiple Subjects --- src/NATS.Client/JetStream/MessageInfo.cs | 2 +- src/NATS.Client/MsgHeader.cs | 24 ++++++++++ .../TestJetStreamManagement.cs | 48 +++++++++++++++++++ src/Tests/UnitTests/TestMessageHeaders.cs | 8 +++- 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client/JetStream/MessageInfo.cs b/src/NATS.Client/JetStream/MessageInfo.cs index d32f712f7..b8c0e6ae4 100644 --- a/src/NATS.Client/JetStream/MessageInfo.cs +++ b/src/NATS.Client/JetStream/MessageInfo.cs @@ -45,7 +45,7 @@ private void Init(Msg msg, bool fromDirect, string streamName) if (fromDirect) { Headers = msg.Header; - Subject = msg.Header[JetStreamConstants.NatsSubject]; + Subject = msg.Header.GetLast(JetStreamConstants.NatsSubject); Data = msg.Data; Sequence = ulong.Parse(msg.Header[JetStreamConstants.NatsSequence]); Time = JsonUtils.AsDate(msg.Header[JetStreamConstants.NatsTimestamp]); diff --git a/src/NATS.Client/MsgHeader.cs b/src/NATS.Client/MsgHeader.cs index 85cbd05af..adae9bb49 100644 --- a/src/NATS.Client/MsgHeader.cs +++ b/src/NATS.Client/MsgHeader.cs @@ -249,6 +249,30 @@ public string[] GetValues(string name) return nvc.GetValues(name); } + /// + /// Gets the first value for the specific key. + /// Will be null if the key is not found + /// + /// + /// + public string GetFirst(string name) + { + string[] all = nvc.GetValues(name); + return all == null ? null: all[0]; + } + + /// + /// Gets the last value for the specific key. + /// Will be null if the key is not found + /// + /// + /// + public string GetLast(string name) + { + string[] all = nvc.GetValues(name); + return all == null ? null: all[all.Length-1]; + } + /// /// Returns an enumerator that iterates through the message header /// keys. diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs index 59b3574b0..b3603d912 100644 --- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs +++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs @@ -17,6 +17,7 @@ using NATS.Client; using NATS.Client.Internals; using NATS.Client.JetStream; +using NATS.Client.KeyValue; using Xunit; using static IntegrationTests.JetStreamTestBase; using static UnitTests.TestBase; @@ -1069,5 +1070,52 @@ private void ValidateMgr(ulong seq, string lastBySubject, string nextBySubject, Assert.Equal(lastBySubject != null, mgr.IsLastBySubject); Assert.Equal(nextBySubject != null, mgr.IsNextBySubject); } + + [Fact] + public void TestDirectMessageRepublishedSubject() + { + Context.RunInJsServer(c => + { + IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); + String streamBucketName = "sb-" + Variant(null); + String subject = Subject(); + String streamSubject = subject + ".>"; + String publishSubject1 = subject + ".one"; + String publishSubject2 = subject + ".two"; + String publishSubject3 = subject + ".three"; + String republishDest = "$KV." + streamBucketName + ".>"; + + StreamConfiguration sc = StreamConfiguration.Builder() + .WithName(streamBucketName) + .WithStorageType(StorageType.Memory) + .WithSubjects(streamSubject) + .WithRepublish(Republish.Builder().WithSource(">").WithDestination(republishDest).Build()) + .Build(); + jsm.AddStream(sc); + + KeyValueConfiguration kvc = KeyValueConfiguration.Builder().WithName(streamBucketName).Build(); + c.CreateKeyValueManagementContext().Create(kvc); + IKeyValue kv = c.CreateKeyValueContext(streamBucketName); + + c.Publish(publishSubject1, Encoding.UTF8.GetBytes("uno")); + c.CreateJetStreamContext().Publish(publishSubject2, Encoding.UTF8.GetBytes("dos")); + kv.Put(publishSubject3, "tres"); + + KeyValueEntry kve1 = kv.Get(publishSubject1); + Assert.Equal(streamBucketName, kve1.Bucket); + Assert.Equal(publishSubject1, kve1.Key); + Assert.Equal("uno", kve1.ValueAsString()); + + KeyValueEntry kve2 = kv.Get(publishSubject2); + Assert.Equal(streamBucketName, kve2.Bucket); + Assert.Equal(publishSubject2, kve2.Key); + Assert.Equal("dos", kve2.ValueAsString()); + + KeyValueEntry kve3 = kv.Get(publishSubject3); + Assert.Equal(streamBucketName, kve3.Bucket); + Assert.Equal(publishSubject3, kve3.Key); + Assert.Equal("tres", kve3.ValueAsString()); + }); + } } } diff --git a/src/Tests/UnitTests/TestMessageHeaders.cs b/src/Tests/UnitTests/TestMessageHeaders.cs index 4cc519385..ec0563755 100644 --- a/src/Tests/UnitTests/TestMessageHeaders.cs +++ b/src/Tests/UnitTests/TestMessageHeaders.cs @@ -222,15 +222,21 @@ public void TestHeaderMultiValueSerialization() public void TestHeaderMultiValues() { var mh = new MsgHeader(); + Assert.Null(mh.GetValues("foo")); + Assert.Null(mh.GetFirst("foo")); + Assert.Null(mh.GetLast("foo")); + mh.Add("foo", "bar"); mh.Add("foo", "baz,comma"); // Test the GetValues API, don't make assumptions about order. - string []values = mh.GetValues("foo"); + string[] values = mh.GetValues("foo"); Assert.True(values.Length == 2); List results = new List(values); Assert.Contains("bar", results); Assert.Contains("baz,comma", results); + Assert.Equal(values[0], mh.GetFirst("foo")); + Assert.Equal(values[1], mh.GetLast("foo")); byte[] bytes = mh.ToByteArray(); var hsr = new HeaderStatusReader(bytes, bytes.Length);