diff --git a/src/NATS.Client/JetStream/MessageInfo.cs b/src/NATS.Client/JetStream/MessageInfo.cs
index d32f712f7..b8c0e6ae4 100644
--- a/src/NATS.Client/JetStream/MessageInfo.cs
+++ b/src/NATS.Client/JetStream/MessageInfo.cs
@@ -45,7 +45,7 @@ private void Init(Msg msg, bool fromDirect, string streamName)
if (fromDirect)
{
Headers = msg.Header;
- Subject = msg.Header[JetStreamConstants.NatsSubject];
+ Subject = msg.Header.GetLast(JetStreamConstants.NatsSubject);
Data = msg.Data;
Sequence = ulong.Parse(msg.Header[JetStreamConstants.NatsSequence]);
Time = JsonUtils.AsDate(msg.Header[JetStreamConstants.NatsTimestamp]);
diff --git a/src/NATS.Client/MsgHeader.cs b/src/NATS.Client/MsgHeader.cs
index 85cbd05af..adae9bb49 100644
--- a/src/NATS.Client/MsgHeader.cs
+++ b/src/NATS.Client/MsgHeader.cs
@@ -249,6 +249,30 @@ public string[] GetValues(string name)
return nvc.GetValues(name);
}
+ ///
+ /// Gets the first value for the specific key.
+ /// Will be null if the key is not found
+ ///
+ ///
+ ///
+ public string GetFirst(string name)
+ {
+ string[] all = nvc.GetValues(name);
+ return all == null ? null: all[0];
+ }
+
+ ///
+ /// Gets the last value for the specific key.
+ /// Will be null if the key is not found
+ ///
+ ///
+ ///
+ public string GetLast(string name)
+ {
+ string[] all = nvc.GetValues(name);
+ return all == null ? null: all[all.Length-1];
+ }
+
///
/// Returns an enumerator that iterates through the message header
/// keys.
diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs
index 59b3574b0..b3603d912 100644
--- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs
+++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs
@@ -17,6 +17,7 @@
using NATS.Client;
using NATS.Client.Internals;
using NATS.Client.JetStream;
+using NATS.Client.KeyValue;
using Xunit;
using static IntegrationTests.JetStreamTestBase;
using static UnitTests.TestBase;
@@ -1069,5 +1070,52 @@ private void ValidateMgr(ulong seq, string lastBySubject, string nextBySubject,
Assert.Equal(lastBySubject != null, mgr.IsLastBySubject);
Assert.Equal(nextBySubject != null, mgr.IsNextBySubject);
}
+
+ [Fact]
+ public void TestDirectMessageRepublishedSubject()
+ {
+ Context.RunInJsServer(c =>
+ {
+ IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
+ String streamBucketName = "sb-" + Variant(null);
+ String subject = Subject();
+ String streamSubject = subject + ".>";
+ String publishSubject1 = subject + ".one";
+ String publishSubject2 = subject + ".two";
+ String publishSubject3 = subject + ".three";
+ String republishDest = "$KV." + streamBucketName + ".>";
+
+ StreamConfiguration sc = StreamConfiguration.Builder()
+ .WithName(streamBucketName)
+ .WithStorageType(StorageType.Memory)
+ .WithSubjects(streamSubject)
+ .WithRepublish(Republish.Builder().WithSource(">").WithDestination(republishDest).Build())
+ .Build();
+ jsm.AddStream(sc);
+
+ KeyValueConfiguration kvc = KeyValueConfiguration.Builder().WithName(streamBucketName).Build();
+ c.CreateKeyValueManagementContext().Create(kvc);
+ IKeyValue kv = c.CreateKeyValueContext(streamBucketName);
+
+ c.Publish(publishSubject1, Encoding.UTF8.GetBytes("uno"));
+ c.CreateJetStreamContext().Publish(publishSubject2, Encoding.UTF8.GetBytes("dos"));
+ kv.Put(publishSubject3, "tres");
+
+ KeyValueEntry kve1 = kv.Get(publishSubject1);
+ Assert.Equal(streamBucketName, kve1.Bucket);
+ Assert.Equal(publishSubject1, kve1.Key);
+ Assert.Equal("uno", kve1.ValueAsString());
+
+ KeyValueEntry kve2 = kv.Get(publishSubject2);
+ Assert.Equal(streamBucketName, kve2.Bucket);
+ Assert.Equal(publishSubject2, kve2.Key);
+ Assert.Equal("dos", kve2.ValueAsString());
+
+ KeyValueEntry kve3 = kv.Get(publishSubject3);
+ Assert.Equal(streamBucketName, kve3.Bucket);
+ Assert.Equal(publishSubject3, kve3.Key);
+ Assert.Equal("tres", kve3.ValueAsString());
+ });
+ }
}
}
diff --git a/src/Tests/UnitTests/TestMessageHeaders.cs b/src/Tests/UnitTests/TestMessageHeaders.cs
index 4cc519385..ec0563755 100644
--- a/src/Tests/UnitTests/TestMessageHeaders.cs
+++ b/src/Tests/UnitTests/TestMessageHeaders.cs
@@ -222,15 +222,21 @@ public void TestHeaderMultiValueSerialization()
public void TestHeaderMultiValues()
{
var mh = new MsgHeader();
+ Assert.Null(mh.GetValues("foo"));
+ Assert.Null(mh.GetFirst("foo"));
+ Assert.Null(mh.GetLast("foo"));
+
mh.Add("foo", "bar");
mh.Add("foo", "baz,comma");
// Test the GetValues API, don't make assumptions about order.
- string []values = mh.GetValues("foo");
+ string[] values = mh.GetValues("foo");
Assert.True(values.Length == 2);
List results = new List(values);
Assert.Contains("bar", results);
Assert.Contains("baz,comma", results);
+ Assert.Equal(values[0], mh.GetFirst("foo"));
+ Assert.Equal(values[1], mh.GetLast("foo"));
byte[] bytes = mh.ToByteArray();
var hsr = new HeaderStatusReader(bytes, bytes.Length);