Skip to content

Commit

Permalink
Added Integration test for ASB as well patching more properties throu…
Browse files Browse the repository at this point in the history
…gh to the infrastructure #1442
  • Loading branch information
preardon committed Sep 25, 2021
1 parent b51baa9 commit 3604b77
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 15 deletions.
2 changes: 0 additions & 2 deletions samples/ASBTaskQueue/GreetingsReceiverConsole/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public async static Task Main(string[] args)
var subscriptions = new Subscription[]
{
new AzureServiceBusSubscription<GreetingAsyncEvent>(
typeof(GreetingAsyncEvent),
new SubscriptionName(GreetingEventAsyncMessageMapper.Topic),
new ChannelName("paramore.example.greeting"),
new RoutingKey(GreetingEventAsyncMessageMapper.Topic),
Expand All @@ -35,7 +34,6 @@ public async static Task Main(string[] args)
requeueCount: 3,
isAsync: true),
new AzureServiceBusSubscription<GreetingEvent>(
typeof(GreetingEvent),
new SubscriptionName(GreetingEventMessageMapper.Topic),
new ChannelName("paramore.example.greeting"),
new RoutingKey(GreetingEventMessageMapper.Topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static IBrighterBuilder UseInMemoryOutbox(this IBrighterBuilder brighterB
/// <param name="useRequestResponseQueues">Add support for RPC over MoM by using a reply queue</param>
/// <param name="replyQueueSubscriptions">Reply queue subscription</param>
/// <returns>The Brighter builder to allow chaining of requests</returns>
public static IBrighterHandlerBuilder UseExternalBus(this IBrighterHandlerBuilder brighterBuilder, IAmAMessageProducer producer, bool useRequestResponseQueues = false, IEnumerable<Subscription> replyQueueSubscriptions = null)
public static IBrighterBuilder UseExternalBus(this IBrighterBuilder brighterBuilder, IAmAMessageProducer producer, bool useRequestResponseQueues = false, IEnumerable<Subscription> replyQueueSubscriptions = null)
{
brighterBuilder.Services.AddSingleton(producer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private Message MapToBrighterMessage(IBrokeredMessageWrapper azureServiceBusMess
_topicName, _subscriptionName, messageBody);
MessageType messageType = GetMessageType(azureServiceBusMessage);
var handledCount = GetHandledCount(azureServiceBusMessage);
var headers = new MessageHeader(Guid.NewGuid(), _topicName, messageType, DateTime.UtcNow, handledCount, 0);
var headers = new MessageHeader(azureServiceBusMessage.Id, _topicName, messageType, DateTime.UtcNow, handledCount, 0, azureServiceBusMessage.CorrelationId, contentType: azureServiceBusMessage.ContentType);
if(_receiveMode.Equals(ReceiveMode.PeekLock)) headers.Bag.Add(_lockTokenKey, azureServiceBusMessage.LockToken);
var message = new Message(headers, new MessageBody(messageBody));
return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ public AzureServiceBusConsumerFactory(AzureServiceBusConfiguration configuration

public IAmAMessageConsumer Create(Subscription subscription)
{
var nameSpaceManagerWrapper = new ManagementClientWrapper(_configuration);
return Create(subscription, _configuration);
}

public static IAmAMessageConsumer Create(Subscription subscription, AzureServiceBusConfiguration configuration)
{
var nameSpaceManagerWrapper = new ManagementClientWrapper(configuration);

return new AzureServiceBusConsumer(subscription.RoutingKey, subscription.ChannelName,
new AzureServiceBusMessageProducer(nameSpaceManagerWrapper,
new TopicClientProvider(_configuration), subscription.MakeChannels), nameSpaceManagerWrapper,
new MessageReceiverProvider(_configuration),
new TopicClientProvider(configuration), subscription.MakeChannels), nameSpaceManagerWrapper,
new MessageReceiverProvider(configuration),
makeChannels: subscription.MakeChannels,
receiveMode: _configuration.AckOnRead ? ReceiveMode.PeekLock : ReceiveMode.ReceiveAndDelete,
receiveMode: configuration.AckOnRead ? ReceiveMode.PeekLock : ReceiveMode.ReceiveAndDelete,
batchSize: subscription.BufferSize);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public async Task SendWithDelayAsync(Message message, int delayMilliseconds = 0)
var azureServiceBusMessage = new Microsoft.Azure.ServiceBus.Message(message.Body.Bytes);
azureServiceBusMessage.UserProperties.Add("MessageType", message.Header.MessageType.ToString());
azureServiceBusMessage.UserProperties.Add("HandledCount", message.Header.HandledCount);
azureServiceBusMessage.CorrelationId = message.Header.CorrelationId.ToString();
azureServiceBusMessage.ContentType = message.Header.ContentType;
azureServiceBusMessage.MessageId = message.Header.Id.ToString();
if (delayMilliseconds == 0)
{
await topicClient.SendAsync(azureServiceBusMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public AzureServiceBusSubscription(
RoutingKey routingKey = null,
int bufferSize = 1,
int noOfPerformers = 1,
int timeoutInMilliseconds = 300,
int timeoutInMilliseconds = 400,
int requeueCount = -1,
int requeueDelayInMilliseconds = 0,
int unacceptableMessageLimit = 0,
Expand All @@ -28,20 +28,19 @@ public AzureServiceBusSubscription(
public class AzureServiceBusSubscription<T> : AzureServiceBusSubscription where T : IRequest
{
public AzureServiceBusSubscription(
Type dataType,
SubscriptionName name = null,
ChannelName channelName = null,
RoutingKey routingKey = null,
int bufferSize = 1,
int noOfPerformers = 1,
int timeoutInMilliseconds = 300,
int timeoutInMilliseconds = 400,
int requeueCount = -1,
int requeueDelayInMilliseconds = 0,
int unacceptableMessageLimit = 0,
bool isAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds,
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds,
requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory,
makeChannels)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers
{
Expand All @@ -16,5 +17,25 @@ public BrokeredMessageWrapper(Microsoft.Azure.ServiceBus.Message brokeredMessage
public IDictionary<string, object> UserProperties => _brokeredMessage.UserProperties;

public string LockToken => _brokeredMessage.SystemProperties.LockToken;

public Guid Id
{
get
{
return Guid.Parse(_brokeredMessage.MessageId);
}
}

public Guid CorrelationId
{
get
{
if (string.IsNullOrEmpty(_brokeredMessage.CorrelationId))
return Guid.Empty;
return Guid.Parse(_brokeredMessage.CorrelationId);
}
}

public string ContentType{ get => _brokeredMessage.ContentType; }
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers
{
Expand All @@ -9,5 +10,10 @@ public interface IBrokeredMessageWrapper
IDictionary<string, object> UserProperties { get; }

string LockToken { get; }

Guid Id { get; }
Guid CorrelationId { get; }

string ContentType { get; }
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers
using System.Threading.Tasks;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers
{
public interface IManagementClientWrapper
{
bool TopicExists(string topic);

void CreateTopic(string topic);

Task DeleteTopicAsync(string topic);

bool SubscriptionExists(string topicName, string subscriptionName);

void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus.Management;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;
Expand Down Expand Up @@ -90,6 +91,20 @@ public void CreateTopic(string topic)
s_logger.LogInformation("Topic {Topic} created.", topic);
}

public async Task DeleteTopicAsync(string topic)
{
s_logger.LogInformation("Deleting topic {Topic}...", topic);
try
{
await _managementClient.DeleteTopicAsync(topic);
s_logger.LogInformation("Topic {Topic} successfully deleted", topic);
}
catch (Exception e)
{
s_logger.LogError(e,"Failed to delete Topic {Topic}", topic);
}
}

public bool SubscriptionExists(string topicName, string subscriptionName)
{
s_logger.LogDebug("Checking if subscription {ChannelName} for topic {Topic} exists...", subscriptionName, topicName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway
{
internal static class ASBCreds
{
public static string ASBConnectionString { get
{
var connString = Environment.GetEnvironmentVariable("BrighterTestsASBConnectionString");
if (string.IsNullOrEmpty(connString))
throw new Exception("ASB ConnectionString not set");
return connString;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway
{
public static class StringHelper
{
public static string Truncate(this string str, int maxLength)
{
str = str.Trim();
int right = Math.Min(str.Length, maxLength);
return str.Substring(0, right);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using FluentAssertions;
using Paramore.Brighter.AzureServiceBus.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.AzureServiceBus;
using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers;
using Xunit;

namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway
{
[Trait("Category", "ASB")]
public class ASBProducerTests : IDisposable
{
private readonly Message _message;
private readonly IAmAChannel _channel;
private readonly AzureServiceBusMessageProducer _messageProducer;
private readonly AzureServiceBusChannelFactory _channelFactory;
private readonly ASBTestCommand _command;
private readonly Guid _correlationId;
private readonly string _contentType;
private readonly string _topicName;
private readonly IManagementClientWrapper _managementClient;

public ASBProducerTests()
{
_command = new ASBTestCommand()
{
CommandValue = "Do the things.",
CommandNumber = 26
};

_correlationId = Guid.NewGuid();
var channelName = $"Producer-Send-Tests-{Guid.NewGuid()}".Truncate(50);
_topicName = $"Producer-Send-Tests-{Guid.NewGuid()}";
var routingKey = new RoutingKey(_topicName);

AzureServiceBusSubscription<ASBTestCommand> subscription = new(
name: new SubscriptionName(channelName),
channelName: new ChannelName(channelName),
routingKey: routingKey
);

_contentType = "application/json";

_message = new Message(
new MessageHeader(_command.Id, _topicName, MessageType.MT_COMMAND, _correlationId, contentType: _contentType),
new MessageBody(JsonSerializer.Serialize(_command, JsonSerialisationOptions.Options))
);

var config = new AzureServiceBusConfiguration(ASBCreds.ASBConnectionString, false);

_managementClient = new ManagementClientWrapper(config);
_managementClient.CreateSubscription(_topicName, channelName, 5);

_channelFactory = new AzureServiceBusChannelFactory(new AzureServiceBusConsumerFactory(config));
_channel = _channelFactory.CreateChannel(subscription);

_messageProducer = AzureServiceBusMessageProducerFactory.Get(config);
}

[Fact]
public async Task When_posting_a_message_via_the_producer()
{
//arrange
await _messageProducer.SendAsync(_message);

var message = _channel.Receive(5000);

//clear the queue
_channel.Acknowledge(message);

message.Header.MessageType.Should().Be(MessageType.MT_COMMAND);

message.Id.Should().Be(_command.Id);
message.Redelivered.Should().BeFalse();
message.Header.Id.Should().Be(_command.Id);
message.Header.Topic.Should().Contain(_topicName);
message.Header.CorrelationId.Should().Be(_correlationId);
message.Header.ContentType.Should().Be(_contentType);
message.Header.HandledCount.Should().Be(0);
//allow for clock drift in the following test, more important to have a contemporary timestamp than anything
message.Header.TimeStamp.Should().BeAfter(RoundToSeconds(DateTime.UtcNow.AddMinutes(-1)));
message.Header.DelayedMilliseconds.Should().Be(0);
//{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false}
message.Body.Value.Should().Be(_message.Body.Value);
}

public void Dispose()
{
_managementClient.DeleteTopicAsync(_topicName).GetAwaiter().GetResult();
}

private DateTime RoundToSeconds(DateTime dateTime)
{
return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="Moq" Version="4.16.1" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Paramore.Brighter.AzureServiceBus.Tests.TestDoubles
{
public class ASBTestCommand : Command
{
public ASBTestCommand() : base(Guid.NewGuid())
{
}

public string CommandValue { get; set; }
public int CommandNumber { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Paramore.Brighter.AzureServiceBus.Tests.TestDoubles
{
public class ASBTestEvent : Event
{
public ASBTestEvent() : base(Guid.NewGuid())
{
}

public string EventName { get; set; }
public int EventNumber { get; set; }
}
}
1 change: 1 addition & 0 deletions tests/Paramore.Brighter.AzureServiceBus.Tests/readme.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
To run the integration tests the Environemnt Variable BrighterTestsASBConnectionString must be set to the connection string i.e. Endpoint=sb://my-service-bus.servicebus.windows.net/;Authentication=Managed Identity

0 comments on commit 3604b77

Please sign in to comment.