Skip to content

Commit

Permalink
Merge pull request #39 from nicogiard/kafka-messageparser
Browse files Browse the repository at this point in the history
Update the MessageParser to rely on QueueReference instead of HeaderNames.MessageType
  • Loading branch information
mizrael authored Aug 25, 2021
2 parents 6d99475 + bcc94e8 commit ac0b151
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 51 deletions.
5 changes: 4 additions & 1 deletion src/OpenSleigh.Transport.Kafka/IQueueReferenceFactory.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using OpenSleigh.Core.Messaging;
using System;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.Kafka
{
public interface IQueueReferenceFactory
{
QueueReferences Create<TM>(TM message = default) where TM : IMessage;

Type GetQueueType(string topic);
}
}
2 changes: 1 addition & 1 deletion src/OpenSleigh.Transport.Kafka/KafkaPublisherExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Task<DeliveryResult<Guid, byte[]>> PublishAsync(IMessage message,
return PublishAsyncCore(message, topic, additionalHeaders, cancellationToken);
}

private async Task<DeliveryResult<Guid, byte[]>> PublishAsyncCore(IMessage message,
private async Task<DeliveryResult<Guid, byte[]>> PublishAsyncCore(IMessage message,
string topic,
IEnumerable<Header> additionalHeaders,
CancellationToken cancellationToken)
Expand Down
20 changes: 8 additions & 12 deletions src/OpenSleigh.Transport.Kafka/MessageParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,27 @@ namespace OpenSleigh.Transport.Kafka
{
public class MessageParser : IMessageParser
{
private readonly ITypeResolver _typeResolver;
private readonly ITransportSerializer _serializer;
private readonly IQueueReferenceFactory _queueReferenceFactory;

public MessageParser(ITypeResolver typeResolver, ITransportSerializer serializer)
{
_typeResolver = typeResolver ?? throw new ArgumentNullException(nameof(typeResolver));
public MessageParser(ITransportSerializer serializer, IQueueReferenceFactory queueReferenceFactory)
{
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_queueReferenceFactory = queueReferenceFactory ?? throw new ArgumentNullException(nameof(queueReferenceFactory));
}

public IMessage Parse(ConsumeResult<Guid, byte[]> consumeResult)
{
if (consumeResult is null)
throw new ArgumentNullException(nameof(consumeResult));
if (consumeResult.Message?.Headers is null)
throw new ArgumentNullException(nameof(consumeResult), "message headers are missing");

var messageTypeHeader = consumeResult.Message.Headers.FirstOrDefault(h => h.Key == HeaderNames.MessageType);
if(messageTypeHeader is null)

var messageType = _queueReferenceFactory.GetQueueType(consumeResult.Topic);
if(messageType is null)
throw new ArgumentException("invalid message type");

var messageTypeName = Encoding.UTF8.GetString(messageTypeHeader.GetValueBytes());
var messageType = _typeResolver.Resolve(messageTypeName);
var decodedObj = _serializer.Deserialize(consumeResult.Message.Value, messageType);
if (decodedObj is not IMessage message)
throw new ArgumentException($"message has the wrong type: '{messageTypeName}'");
throw new ArgumentException($"message has the wrong type: '{messageType.FullName}'");
return message;
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/OpenSleigh.Transport.Kafka/QueueReferenceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Concurrent;
using System.Linq;

namespace OpenSleigh.Transport.Kafka
{
Expand Down Expand Up @@ -31,5 +32,15 @@ private QueueReferences CreateCore<TM>()
var creator = _sp.GetService<QueueReferencesPolicy<TM>>();
return (creator is null) ? _defaultCreator(typeof(TM)) : creator();
}

public Type GetQueueType(string topic)
{
if (string.IsNullOrWhiteSpace(topic))
throw new ArgumentNullException(topic);

var queueRef = _queueReferencesCache.FirstOrDefault(pair => topic.Equals(pair.Value.TopicName, StringComparison.InvariantCultureIgnoreCase));

return queueRef.Key;
}
}
}
56 changes: 19 additions & 37 deletions tests/OpenSleigh.Transport.Kafka.Tests/Unit/MessageParserTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,19 @@ public class MessageParserTests
[Fact]
public void Resolve_should_throw_when_input_null()
{
var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
var resolver = NSubstitute.Substitute.For<ITypeResolver>();
var sut = new MessageParser(resolver, decoder);
var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
var sut = new MessageParser(decoder, queueReferenceFactory);

Assert.Throws<ArgumentNullException>(() => sut.Parse(null));
}

[Fact]
public void Resolve_should_throw_when_headers_null()
{
var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
var resolver = NSubstitute.Substitute.For<ITypeResolver>();
var sut = new MessageParser(resolver, decoder);

var consumeResult = new ConsumeResult<Guid, byte[]>();

var ex = Assert.Throws<ArgumentNullException>(() => sut.Parse(consumeResult));
ex.Message.Should().Contain("message headers are missing");
}


[Fact]
public void Resolve_should_throw_when_headers_do_not_contain_message_type()
{
var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
var resolver = NSubstitute.Substitute.For<ITypeResolver>();
var sut = new MessageParser(resolver, decoder);
var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
var sut = new MessageParser(decoder, queueReferenceFactory);

var consumeResult = new ConsumeResult<Guid, byte[]>()
{
Expand All @@ -57,49 +43,45 @@ public void Resolve_should_throw_when_headers_do_not_contain_message_type()
[Fact]
public void Resolve_should_throw_when_message_type_header_does_not_match()
{
var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
var resolver = NSubstitute.Substitute.For<ITypeResolver>();
var sut = new MessageParser(resolver, decoder);
Type messageType = null;
var messageTopic = "lorem";
var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
queueReferenceFactory.GetQueueType(messageTopic).Returns(messageType);
var sut = new MessageParser(decoder, queueReferenceFactory);

var consumeResult = new ConsumeResult<Guid, byte[]>()
{
Topic= messageTopic,
Message = new Message<Guid, byte[]>()
{
Headers = new Headers()
{
{ HeaderNames.MessageType, Encoding.UTF8.GetBytes("lorem")}
}
}
};

var ex = Assert.Throws<ArgumentException>(() => sut.Parse(consumeResult));
ex.Message.Should().Contain("message has the wrong type");
ex.Message.Should().Contain("invalid message type");
}

[Fact]
public void Resolve_should_return_message()
{
var messageType = typeof(DummyMessage);
var message = DummyMessage.New();
var messageTopic = "DummyMessage";
; var message = DummyMessage.New();
var encodedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var messageBytes = Encoding.UTF8.GetBytes(encodedMessage);

var decoder = NSubstitute.Substitute.For<ITransportSerializer>();
decoder.Deserialize(messageBytes, messageType).Returns(message);

var resolver = NSubstitute.Substitute.For<ITypeResolver>();
resolver.Resolve(messageType.FullName).Returns(messageType);
var queueReferenceFactory = NSubstitute.Substitute.For<IQueueReferenceFactory>();
queueReferenceFactory.GetQueueType(messageTopic).Returns(messageType);

var sut = new MessageParser(resolver, decoder);
var sut = new MessageParser(decoder, queueReferenceFactory);

var consumeResult = new ConsumeResult<Guid, byte[]>()
{
Topic = messageTopic,
Message = new Message<Guid, byte[]>()
{
Headers = new Headers()
{
{HeaderNames.MessageType, Encoding.UTF8.GetBytes(messageType.FullName)}
},
Value = messageBytes
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,39 @@ public void ctor_should_throw_if_service_provider_null()
{
Assert.Throws<ArgumentNullException>(() => new QueueReferenceFactory(null));
}

[Fact]
public void GetQueueType_should_throw_when_input_invalid()
{
var sp = NSubstitute.Substitute.For<IServiceProvider>();
var sut = new QueueReferenceFactory(sp);

Assert.Throws<ArgumentNullException>(() => sut.GetQueueType(null));
Assert.Throws<ArgumentNullException>(() => sut.GetQueueType(""));
Assert.Throws<ArgumentNullException>(() => sut.GetQueueType(" "));
}

[Fact]
public void GetQueueType_should_return_null_when_type_not_found()
{
var sp = NSubstitute.Substitute.For<IServiceProvider>();
var sut = new QueueReferenceFactory(sp);

var result = sut.GetQueueType("invalid topic name");
result.Should().BeNull();
}

[Fact]
public void GetQueueType_should_return_type_when_input_valid()
{
var sp = NSubstitute.Substitute.For<IServiceProvider>();
var sut = new QueueReferenceFactory(sp);

var queueRef = sut.Create<DummyMessage>();
queueRef.Should().NotBeNull();

var result = sut.GetQueueType(queueRef.TopicName);
result.Should().Be(typeof(DummyMessage));
}
}
}

0 comments on commit ac0b151

Please sign in to comment.