From 466b8629a48f876c14f24a6a46c612c92bec26dd Mon Sep 17 00:00:00 2001
From: "Nicolas Giard (ext)" <giard.nicolasext@matmut.fr>
Date: Wed, 25 Aug 2021 13:52:50 +0200
Subject: [PATCH 1/2] feat: update the MessageParser to rely on QueueReference
 instead of HeaderNames.MessageType

---
 .../IQueueReferenceFactory.cs                 |  5 +-
 .../MessageParser.cs                          | 16 +++----
 .../QueueReferenceFactory.cs                  |  5 ++
 .../Unit/MessageParserTests.cs                | 48 +++++++------------
 4 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/src/OpenSleigh.Transport.Kafka/IQueueReferenceFactory.cs b/src/OpenSleigh.Transport.Kafka/IQueueReferenceFactory.cs
index 9f5542cc..d9284969 100644
--- a/src/OpenSleigh.Transport.Kafka/IQueueReferenceFactory.cs
+++ b/src/OpenSleigh.Transport.Kafka/IQueueReferenceFactory.cs
@@ -1,9 +1,12 @@
-using OpenSleigh.Core.Messaging;
+using System;
+using OpenSleigh.Core.Messaging;
 
 namespace OpenSleigh.Transport.Kafka
 {
     public interface IQueueReferenceFactory
     {
         QueueReferences Create<TM>(TM message = default) where TM : IMessage;
+        
+        Type GetQueueType(string topic);
     }
 }
\ No newline at end of file
diff --git a/src/OpenSleigh.Transport.Kafka/MessageParser.cs b/src/OpenSleigh.Transport.Kafka/MessageParser.cs
index d8b29b56..7c8a7a2c 100644
--- a/src/OpenSleigh.Transport.Kafka/MessageParser.cs
+++ b/src/OpenSleigh.Transport.Kafka/MessageParser.cs
@@ -12,29 +12,27 @@ public class MessageParser : IMessageParser
     {
         private readonly ITypeResolver _typeResolver;
         private readonly ITransportSerializer _serializer;
+        private readonly IQueueReferenceFactory _queueReferenceFactory;
 
-        public MessageParser(ITypeResolver typeResolver, ITransportSerializer serializer)
+        public MessageParser(ITypeResolver typeResolver, ITransportSerializer serializer, IQueueReferenceFactory queueReferenceFactory)
         {
             _typeResolver = typeResolver ?? throw new ArgumentNullException(nameof(typeResolver));
             _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
+            _queueReferenceFactory = queueReferenceFactory ?? throw new ArgumentNullException(nameof(queueReferenceFactory));
         }
 
         public IMessage Parse(ConsumeResult<Guid, byte[]> consumeResult)
         {
             if (consumeResult is null)
                 throw new ArgumentNullException(nameof(consumeResult));
-            if (consumeResult.Message?.Headers is null)
-                throw new ArgumentNullException(nameof(consumeResult), "message headers are missing");
-
-            var messageTypeHeader = consumeResult.Message.Headers.FirstOrDefault(h => h.Key == HeaderNames.MessageType);
-            if(messageTypeHeader is null)
+            
+            var messageType = _queueReferenceFactory.GetQueueType(consumeResult.Topic);
+            if(messageType is null) 
                 throw new ArgumentException("invalid message type");
 
-            var messageTypeName = Encoding.UTF8.GetString(messageTypeHeader.GetValueBytes());
-            var messageType = _typeResolver.Resolve(messageTypeName);
             var decodedObj = _serializer.Deserialize(consumeResult.Message.Value, messageType);
             if (decodedObj is not IMessage message)
-                throw new ArgumentException($"message has the wrong type: '{messageTypeName}'");
+                throw new ArgumentException($"message has the wrong type: '{messageType.FullName}'");
             return message;
         }
     }
diff --git a/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs b/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
index 15f3b2da..84225125 100644
--- a/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
+++ b/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
@@ -2,6 +2,7 @@
 using System;
 using Microsoft.Extensions.DependencyInjection;
 using System.Collections.Concurrent;
+using System.Linq;
 
 namespace OpenSleigh.Transport.Kafka
 {
@@ -31,5 +32,9 @@ private QueueReferences CreateCore<TM>()
             var creator = _sp.GetService<QueueReferencesPolicy<TM>>();
             return (creator is null) ? _defaultCreator(typeof(TM)) : creator();
         }
+
+        public Type GetQueueType(string topic) =>
+            _queueReferencesCache.SingleOrDefault(pair => pair.Value.TopicName.Equals(topic)).Key;
+
     }
 }
\ No newline at end of file
diff --git a/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs b/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
index b1e3f516..8ba01302 100644
--- a/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
+++ b/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
@@ -16,31 +16,19 @@ public void Resolve_should_throw_when_input_null()
         {
             var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
             var resolver = NSubstitute.Substitute.For<ITypeResolver>();
-            var sut = new MessageParser(resolver, decoder);
+            var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
+            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
 
             Assert.Throws<ArgumentNullException>(() => sut.Parse(null));
         }
 
-        [Fact]
-        public void Resolve_should_throw_when_headers_null()
-        {
-            var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
-            var resolver = NSubstitute.Substitute.For<ITypeResolver>();
-            var sut = new MessageParser(resolver, decoder);
-
-            var consumeResult = new ConsumeResult<Guid, byte[]>();
-            
-            var ex = Assert.Throws<ArgumentNullException>(() => sut.Parse(consumeResult));
-            ex.Message.Should().Contain("message headers are missing");
-        }
-
-
         [Fact]
         public void Resolve_should_throw_when_headers_do_not_contain_message_type()
         {
             var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
             var resolver = NSubstitute.Substitute.For<ITypeResolver>();
-            var sut = new MessageParser(resolver, decoder);
+            var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
+            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
 
             var consumeResult = new ConsumeResult<Guid, byte[]>()
             {
@@ -57,30 +45,30 @@ public void Resolve_should_throw_when_headers_do_not_contain_message_type()
         [Fact]
         public void Resolve_should_throw_when_message_type_header_does_not_match()
         {
+            Type messageType = null;
+            var messageTopic = "lorem";
             var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
             var resolver = NSubstitute.Substitute.For<ITypeResolver>();
-            var sut = new MessageParser(resolver, decoder);
+            var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
+            queueReferenceFactory.GetQueueType(messageTopic).Returns(messageType); 
+            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
 
             var consumeResult = new ConsumeResult<Guid, byte[]>()
             {
+                Topic= messageTopic,
                 Message = new Message<Guid, byte[]>()
-                {
-                    Headers = new Headers()
-                    {
-                        { HeaderNames.MessageType, Encoding.UTF8.GetBytes("lorem")}
-                    }
-                }
             };
             
             var ex = Assert.Throws<ArgumentException>(() => sut.Parse(consumeResult));
-            ex.Message.Should().Contain("message has the wrong type");
+            ex.Message.Should().Contain("invalid message type");
         }
 
         [Fact]
         public void Resolve_should_return_message()
         {
             var messageType = typeof(DummyMessage);
-            var message = DummyMessage.New();
+            var messageTopic = "DummyMessage";
+;            var message = DummyMessage.New();
             var encodedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(message);
             var messageBytes = Encoding.UTF8.GetBytes(encodedMessage);
             
@@ -90,16 +78,16 @@ public void Resolve_should_return_message()
             var resolver = NSubstitute.Substitute.For<ITypeResolver>();
             resolver.Resolve(messageType.FullName).Returns(messageType); 
             
-            var sut = new MessageParser(resolver, decoder);
+            var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
+            queueReferenceFactory.GetQueueType(messageTopic).Returns(messageType); 
+            
+            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
 
             var consumeResult = new ConsumeResult<Guid, byte[]>()
             {
+                Topic = messageTopic,
                 Message = new Message<Guid, byte[]>()
                 {
-                    Headers = new Headers()
-                    {
-                        {HeaderNames.MessageType, Encoding.UTF8.GetBytes(messageType.FullName)}
-                    },
                     Value = messageBytes
                 }
             };

From bcc94e8787f7ebc49d4b4ee79785fdd684054cff Mon Sep 17 00:00:00 2001
From: David Guida <davidguida@microsoft.com>
Date: Wed, 25 Aug 2021 15:34:29 +0000
Subject: [PATCH 2/2] refactoring, adding more tests

---
 .../KafkaPublisherExecutor.cs                 |  2 +-
 .../MessageParser.cs                          |  6 ++--
 .../QueueReferenceFactory.cs                  | 10 ++++--
 .../Unit/MessageParserTests.cs                | 18 ++++------
 .../Unit/QueueReferenceFactoryTests.cs        | 34 +++++++++++++++++++
 5 files changed, 51 insertions(+), 19 deletions(-)

diff --git a/src/OpenSleigh.Transport.Kafka/KafkaPublisherExecutor.cs b/src/OpenSleigh.Transport.Kafka/KafkaPublisherExecutor.cs
index 07bd8c4d..5d78c632 100644
--- a/src/OpenSleigh.Transport.Kafka/KafkaPublisherExecutor.cs
+++ b/src/OpenSleigh.Transport.Kafka/KafkaPublisherExecutor.cs
@@ -39,7 +39,7 @@ public Task<DeliveryResult<Guid, byte[]>> PublishAsync(IMessage message,
             return PublishAsyncCore(message, topic, additionalHeaders, cancellationToken);
         }
 
-        private async Task<DeliveryResult<Guid, byte[]>> PublishAsyncCore(IMessage message,
+        private async Task<DeliveryResult<Guid, byte[]>> PublishAsyncCore(IMessage message, 
             string topic,
             IEnumerable<Header> additionalHeaders,
             CancellationToken cancellationToken)
diff --git a/src/OpenSleigh.Transport.Kafka/MessageParser.cs b/src/OpenSleigh.Transport.Kafka/MessageParser.cs
index 7c8a7a2c..e2937df5 100644
--- a/src/OpenSleigh.Transport.Kafka/MessageParser.cs
+++ b/src/OpenSleigh.Transport.Kafka/MessageParser.cs
@@ -10,13 +10,11 @@ namespace OpenSleigh.Transport.Kafka
 {
     public class MessageParser : IMessageParser
     {
-        private readonly ITypeResolver _typeResolver;
         private readonly ITransportSerializer _serializer;
         private readonly IQueueReferenceFactory _queueReferenceFactory;
 
-        public MessageParser(ITypeResolver typeResolver, ITransportSerializer serializer, IQueueReferenceFactory queueReferenceFactory)
-        {
-            _typeResolver = typeResolver ?? throw new ArgumentNullException(nameof(typeResolver));
+        public MessageParser(ITransportSerializer serializer, IQueueReferenceFactory queueReferenceFactory)
+        {     
             _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
             _queueReferenceFactory = queueReferenceFactory ?? throw new ArgumentNullException(nameof(queueReferenceFactory));
         }
diff --git a/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs b/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
index 84225125..b30fd5cb 100644
--- a/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
+++ b/src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
@@ -33,8 +33,14 @@ private QueueReferences CreateCore<TM>()
             return (creator is null) ? _defaultCreator(typeof(TM)) : creator();
         }
 
-        public Type GetQueueType(string topic) =>
-            _queueReferencesCache.SingleOrDefault(pair => pair.Value.TopicName.Equals(topic)).Key;
+        public Type GetQueueType(string topic)
+        {
+            if (string.IsNullOrWhiteSpace(topic))
+                throw new ArgumentNullException(topic);
 
+            var queueRef = _queueReferencesCache.FirstOrDefault(pair => topic.Equals(pair.Value.TopicName, StringComparison.InvariantCultureIgnoreCase));
+            
+            return queueRef.Key;
+        }
     }
 }
\ No newline at end of file
diff --git a/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs b/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
index 8ba01302..7b5309af 100644
--- a/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
+++ b/tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
@@ -14,10 +14,9 @@ public class MessageParserTests
         [Fact]
         public void Resolve_should_throw_when_input_null()
         {
-            var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
-            var resolver = NSubstitute.Substitute.For<ITypeResolver>();
+            var decoder = NSubstitute.Substitute.For<ITransportSerializer>();            
             var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
-            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
+            var sut = new MessageParser(decoder, queueReferenceFactory);
 
             Assert.Throws<ArgumentNullException>(() => sut.Parse(null));
         }
@@ -26,9 +25,8 @@ public void Resolve_should_throw_when_input_null()
         public void Resolve_should_throw_when_headers_do_not_contain_message_type()
         {
             var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
-            var resolver = NSubstitute.Substitute.For<ITypeResolver>();
             var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
-            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
+            var sut = new MessageParser(decoder, queueReferenceFactory);
 
             var consumeResult = new ConsumeResult<Guid, byte[]>()
             {
@@ -47,11 +45,10 @@ public void Resolve_should_throw_when_message_type_header_does_not_match()
         {
             Type messageType = null;
             var messageTopic = "lorem";
-            var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
-            var resolver = NSubstitute.Substitute.For<ITypeResolver>();
+            var decoder = NSubstitute.Substitute.For<ITransportSerializer>();            
             var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
             queueReferenceFactory.GetQueueType(messageTopic).Returns(messageType); 
-            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
+            var sut = new MessageParser(decoder, queueReferenceFactory);
 
             var consumeResult = new ConsumeResult<Guid, byte[]>()
             {
@@ -75,13 +72,10 @@ public void Resolve_should_return_message()
             var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
             decoder.Deserialize(messageBytes, messageType).Returns(message); 
             
-            var resolver = NSubstitute.Substitute.For<ITypeResolver>();
-            resolver.Resolve(messageType.FullName).Returns(messageType); 
-            
             var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
             queueReferenceFactory.GetQueueType(messageTopic).Returns(messageType); 
             
-            var sut = new MessageParser(resolver, decoder, queueReferenceFactory);
+            var sut = new MessageParser(decoder, queueReferenceFactory);
 
             var consumeResult = new ConsumeResult<Guid, byte[]>()
             {
diff --git a/tests/OpenSleigh.Transport.Kafka.Tests/Unit/QueueReferenceFactoryTests.cs b/tests/OpenSleigh.Transport.Kafka.Tests/Unit/QueueReferenceFactoryTests.cs
index 97a57a4c..70c76beb 100644
--- a/tests/OpenSleigh.Transport.Kafka.Tests/Unit/QueueReferenceFactoryTests.cs
+++ b/tests/OpenSleigh.Transport.Kafka.Tests/Unit/QueueReferenceFactoryTests.cs
@@ -69,5 +69,39 @@ public void ctor_should_throw_if_service_provider_null()
         {
             Assert.Throws<ArgumentNullException>(() => new QueueReferenceFactory(null));
         }
+
+        [Fact]
+        public void GetQueueType_should_throw_when_input_invalid()
+        {
+            var sp = NSubstitute.Substitute.For<IServiceProvider>();
+            var sut = new QueueReferenceFactory(sp);
+
+            Assert.Throws<ArgumentNullException>(() => sut.GetQueueType(null));
+            Assert.Throws<ArgumentNullException>(() => sut.GetQueueType(""));
+            Assert.Throws<ArgumentNullException>(() => sut.GetQueueType("   "));
+        }
+
+        [Fact]
+        public void GetQueueType_should_return_null_when_type_not_found()
+        {
+            var sp = NSubstitute.Substitute.For<IServiceProvider>();
+            var sut = new QueueReferenceFactory(sp);
+
+            var result = sut.GetQueueType("invalid topic name");
+            result.Should().BeNull();
+        }
+
+        [Fact]
+        public void GetQueueType_should_return_type_when_input_valid()
+        {
+            var sp = NSubstitute.Substitute.For<IServiceProvider>();
+            var sut = new QueueReferenceFactory(sp);
+
+            var queueRef = sut.Create<DummyMessage>();
+            queueRef.Should().NotBeNull();
+            
+            var result = sut.GetQueueType(queueRef.TopicName);
+            result.Should().Be(typeof(DummyMessage));
+        }
     }
 }