diff --git a/OpenSleigh.sln b/OpenSleigh.sln index fd459e88..0668a2f3 100644 --- a/OpenSleigh.sln +++ b/OpenSleigh.sln @@ -27,6 +27,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.SQL" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.SQL.Tests", "tests\OpenSleigh.Persistence.SQL.Tests\OpenSleigh.Persistence.SQL.Tests.csproj", "{0799A41D-483D-412C-BD10-E2BB3FB907E4}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Transport.AzureServiceBus", "src\OpenSleigh.Transport.AzureServiceBus\OpenSleigh.Transport.AzureServiceBus.csproj", "{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.AzureServiceBus.Tests", "tests\OpenSleigh.Transport.AzureServiceBus.Tests\OpenSleigh.Transport.AzureServiceBus.Tests.csproj", "{483B93A9-3650-4823-ADE2-CA4A1B71EEF6}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -73,6 +77,14 @@ Global {0799A41D-483D-412C-BD10-E2BB3FB907E4}.Debug|Any CPU.Build.0 = Debug|Any CPU {0799A41D-483D-412C-BD10-E2BB3FB907E4}.Release|Any CPU.ActiveCfg = Release|Any CPU {0799A41D-483D-412C-BD10-E2BB3FB907E4}.Release|Any CPU.Build.0 = Release|Any CPU + {CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Release|Any CPU.Build.0 = Release|Any CPU + {483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -86,6 +98,8 @@ Global {2E5F3E44-A3E7-429A-81BB-FC907F0D5377} = {86CDC8FD-5E6F-4F45-A073-9F2749192582} {5B2D9A6B-4ECA-4F73-B654-AEF16485935F} = {5594CC89-F905-46B2-B938-27B8050D9CA3} {0799A41D-483D-412C-BD10-E2BB3FB907E4} = {5594CC89-F905-46B2-B938-27B8050D9CA3} + {CA4B0C5D-200A-4D3D-8C5A-D0910E11585D} = {86CDC8FD-5E6F-4F45-A073-9F2749192582} + {483B93A9-3650-4823-ADE2-CA4A1B71EEF6} = {86CDC8FD-5E6F-4F45-A073-9F2749192582} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5} diff --git a/src/OpenSleigh.Core/DependencyInjection/ServiceCollectionExtensions.cs b/src/OpenSleigh.Core/DependencyInjection/ServiceCollectionExtensions.cs index 289f3e1b..cb927143 100644 --- a/src/OpenSleigh.Core/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/OpenSleigh.Core/DependencyInjection/ServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.DependencyInjection; using System; using System.Diagnostics.CodeAnalysis; +using System.Linq; using OpenSleigh.Core.BackgroundServices; using OpenSleigh.Core.Messaging; using OpenSleigh.Core.Utils; @@ -41,6 +42,13 @@ public static IServiceCollection AddOpenSleigh(this IServiceCollection services, return services; } + + public static IServiceCollection AddBusSubscriber(this IServiceCollection services, Type subscriberType) + { + if (!services.Any(s => s.ImplementationType == subscriberType)) + services.AddSingleton(typeof(ISubscriber), subscriberType); + return services; + } } } \ No newline at end of file diff --git a/src/OpenSleigh.Core/OpenSleigh.Core.csproj b/src/OpenSleigh.Core/OpenSleigh.Core.csproj index 5065225b..a1de0bee 100644 --- a/src/OpenSleigh.Core/OpenSleigh.Core.csproj +++ b/src/OpenSleigh.Core/OpenSleigh.Core.csproj @@ -2,7 +2,7 @@ net5.0 - 0.9.2 + 0.10.0 true davidguida OpenSleigh diff --git a/src/OpenSleigh.Core/Utils/SagaUtils.cs b/src/OpenSleigh.Core/Utils/SagaUtils.cs new file mode 100644 index 00000000..b53838d2 --- /dev/null +++ b/src/OpenSleigh.Core/Utils/SagaUtils.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace OpenSleigh.Core.Utils +{ + public static class SagaUtils + where TS : Saga + where TD : SagaState + { + public static IEnumerable GetHandledMessageTypes() + { + var sagaType = typeof(TS); + var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition(); + var interfaces = sagaType.GetInterfaces(); + foreach (var i in interfaces) + { + if (!i.IsGenericType) + continue; + + var openGeneric = i.GetGenericTypeDefinition(); + if (!openGeneric.IsAssignableFrom(messageHandlerType)) + continue; + + var messageType = i.GetGenericArguments().First(); + yield return messageType; + } + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Persistence.InMemory/InMemorySagaConfiguratorExtensions.cs b/src/OpenSleigh.Persistence.InMemory/InMemorySagaConfiguratorExtensions.cs index fa9bbe92..f169ced9 100644 --- a/src/OpenSleigh.Persistence.InMemory/InMemorySagaConfiguratorExtensions.cs +++ b/src/OpenSleigh.Persistence.InMemory/InMemorySagaConfiguratorExtensions.cs @@ -6,6 +6,7 @@ using System.Reflection; using System.Threading.Channels; using OpenSleigh.Core.Messaging; +using OpenSleigh.Core.Utils; using OpenSleigh.Persistence.InMemory.Messaging; namespace OpenSleigh.Persistence.InMemory @@ -14,28 +15,17 @@ namespace OpenSleigh.Persistence.InMemory public static class InMemorySagaConfiguratorExtensions { private static readonly MethodInfo RawRegisterMessageMethod = typeof(InMemorySagaConfiguratorExtensions) - .GetMethod("RegisterMessage", BindingFlags.Static | BindingFlags.NonPublic); + .GetMethod(nameof(RegisterMessage), BindingFlags.Static | BindingFlags.NonPublic); public static ISagaConfigurator UseInMemoryTransport(this ISagaConfigurator sagaConfigurator) where TS : Saga where TD : SagaState { - var sagaType = typeof(TS); - var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition(); - var interfaces = sagaType.GetInterfaces(); - foreach (var i in interfaces) + var messageTypes = SagaUtils.GetHandledMessageTypes(); + foreach (var messageType in messageTypes) { - if (!i.IsGenericType) - continue; - - var openGeneric = i.GetGenericTypeDefinition(); - if (!openGeneric.IsAssignableFrom(messageHandlerType)) - continue; - - var messageType = i.GetGenericArguments().First(); - var registerMessageMethod = RawRegisterMessageMethod.MakeGenericMethod(messageType); - registerMessageMethod.Invoke(null, new[] {sagaConfigurator.Services}); + registerMessageMethod.Invoke(null, new[] { sagaConfigurator.Services }); } return sagaConfigurator; @@ -55,7 +45,7 @@ private static void RegisterMessage(IServiceCollection services) where TM : { var channel = ctx.GetService>(); return channel?.Writer; - }).AddSingleton>(); + }).AddBusSubscriber(typeof(InMemorySubscriber)); } } } \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/AzureServiceBusConfiguratorExtensions.cs b/src/OpenSleigh.Transport.AzureServiceBus/AzureServiceBusConfiguratorExtensions.cs new file mode 100644 index 00000000..d4f870d0 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/AzureServiceBusConfiguratorExtensions.cs @@ -0,0 +1,40 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.DependencyInjection; +using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Messaging; +using Microsoft.Extensions.Azure; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + [ExcludeFromCodeCoverage] + public record AzureServiceBusConfiguration(string ConnectionString); + + [ExcludeFromCodeCoverage] + public static class AzureServiceBusConfiguratorExtensions + { + public static IBusConfigurator UseAzureServiceBusTransport(this IBusConfigurator busConfigurator, + AzureServiceBusConfiguration config, + Action builderFunc = null) + { + busConfigurator.Services.AddAzureClients(builder => + { + builder.AddServiceBusClient(config.ConnectionString); + }); + + //TODO: evaluate programmatic topics/subscriptions/queues creation based on https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-management-libraries#azuremessagingservicebusadministration + + //https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements?tabs=net-standard-sdk-2#reusing-factories-and-clients + busConfigurator.Services + .AddSingleton() + .AddSingleton() + .AddSingleton() + .AddSingleton() + .AddSingleton(); + + builderFunc?.Invoke(new DefaultAzureServiceBusConfigurationBuilder(busConfigurator)); + + return busConfigurator; + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/AzureServiceBusSagaConfiguratorExtensions.cs b/src/OpenSleigh.Transport.AzureServiceBus/AzureServiceBusSagaConfiguratorExtensions.cs new file mode 100644 index 00000000..aecd0c3a --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/AzureServiceBusSagaConfiguratorExtensions.cs @@ -0,0 +1,24 @@ +using System.Diagnostics.CodeAnalysis; +using OpenSleigh.Core; +using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Utils; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + [ExcludeFromCodeCoverage] + public static class AzureServiceBusSagaConfiguratorExtensions + { + public static ISagaConfigurator UseAzureServiceBusTransport(this ISagaConfigurator sagaConfigurator) + where TS : Saga + where TD : SagaState + { + var messageTypes = SagaUtils.GetHandledMessageTypes(); + foreach (var messageType in messageTypes) + sagaConfigurator.Services.AddBusSubscriber( + typeof(ServiceBusSubscriber<>).MakeGenericType(messageType)); + + return sagaConfigurator; + } + + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/HeaderNames.cs b/src/OpenSleigh.Transport.AzureServiceBus/HeaderNames.cs new file mode 100644 index 00000000..9e2d2268 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/HeaderNames.cs @@ -0,0 +1,7 @@ +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal static class HeaderNames + { + public const string MessageType = "message-type"; + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/IAzureServiceBusConfigurationBuilder.cs b/src/OpenSleigh.Transport.AzureServiceBus/IAzureServiceBusConfigurationBuilder.cs new file mode 100644 index 00000000..688a3b67 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/IAzureServiceBusConfigurationBuilder.cs @@ -0,0 +1,32 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.DependencyInjection; +using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + public interface IAzureServiceBusConfigurationBuilder + { + void UseMessageNamingPolicy(QueueReferencesPolicy policy) where TM : IMessage; + } + + [ExcludeFromCodeCoverage] + internal class DefaultAzureServiceBusConfigurationBuilder : IAzureServiceBusConfigurationBuilder + { + private readonly IBusConfigurator _busConfigurator; + + public DefaultAzureServiceBusConfigurationBuilder(IBusConfigurator busConfigurator) + { + _busConfigurator = busConfigurator; + } + + public void UseMessageNamingPolicy(QueueReferencesPolicy policy) where TM : IMessage + { + if (policy == null) + throw new ArgumentNullException(nameof(policy)); + + _busConfigurator.Services.AddSingleton(policy); + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/IMessageParser.cs b/src/OpenSleigh.Transport.AzureServiceBus/IMessageParser.cs new file mode 100644 index 00000000..9143ca42 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/IMessageParser.cs @@ -0,0 +1,10 @@ +using System; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal interface IMessageParser + { + TM Resolve(BinaryData messageData) where TM : IMessage; + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/IQueueReferenceFactory.cs b/src/OpenSleigh.Transport.AzureServiceBus/IQueueReferenceFactory.cs new file mode 100644 index 00000000..50b6fedb --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/IQueueReferenceFactory.cs @@ -0,0 +1,9 @@ +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + public interface IQueueReferenceFactory + { + QueueReferences Create() where TM : IMessage; + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/IServiceBusProcessorFactory.cs b/src/OpenSleigh.Transport.AzureServiceBus/IServiceBusProcessorFactory.cs new file mode 100644 index 00000000..be0599cb --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/IServiceBusProcessorFactory.cs @@ -0,0 +1,10 @@ +using Azure.Messaging.ServiceBus; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal interface IServiceBusProcessorFactory + { + ServiceBusProcessor Create() where TM : IMessage; + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/IServiceBusSenderFactory.cs b/src/OpenSleigh.Transport.AzureServiceBus/IServiceBusSenderFactory.cs new file mode 100644 index 00000000..5ea81f9b --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/IServiceBusSenderFactory.cs @@ -0,0 +1,10 @@ +using Azure.Messaging.ServiceBus; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal interface IServiceBusSenderFactory + { + ServiceBusSender Create(TM message = default) where TM : IMessage; + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/MessageParser.cs b/src/OpenSleigh.Transport.AzureServiceBus/MessageParser.cs new file mode 100644 index 00000000..f7b59f42 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/MessageParser.cs @@ -0,0 +1,24 @@ +using System; +using OpenSleigh.Core.Messaging; +using OpenSleigh.Core.Utils; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal class MessageParser : IMessageParser + { + private readonly ISerializer _decoder; + + public MessageParser(ISerializer encoder) + { + _decoder = encoder ?? throw new ArgumentNullException(nameof(encoder)); + } + + public TM Resolve(BinaryData messageData) where TM : IMessage + { + if (messageData is null) + throw new ArgumentNullException(nameof(messageData)); + + return (TM)_decoder.Deserialize(messageData, typeof(TM)); + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/OpenSleigh.Transport.AzureServiceBus.csproj b/src/OpenSleigh.Transport.AzureServiceBus/OpenSleigh.Transport.AzureServiceBus.csproj new file mode 100644 index 00000000..31450990 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/OpenSleigh.Transport.AzureServiceBus.csproj @@ -0,0 +1,36 @@ + + + + net5.0 + 0.1.0 + true + davidguida + OpenSleigh.Transport.AzureServiceBus + en-US + OpenSleigh.Transport.AzureServiceBus + Azure Service Bus transport for OpenSleigh. + Copyright 2021 + true + Apache-2.0 + saga saga-pattern dotnet-core csharp message-queue message-bus saga-state-persistence message-transport azure service-bus + ../../packages/ + https://github.com/mizrael/OpenSleigh/ + https://github.com/mizrael/OpenSleigh/ + true + true + snupkg + true + + + + + + + + + + + + + + diff --git a/src/OpenSleigh.Transport.AzureServiceBus/QueueReferenceFactory.cs b/src/OpenSleigh.Transport.AzureServiceBus/QueueReferenceFactory.cs new file mode 100644 index 00000000..470ffc76 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/QueueReferenceFactory.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + public class QueueReferenceFactory : IQueueReferenceFactory + { + private readonly ConcurrentDictionary _queueReferencesCache = new(); + private readonly Func _defaultCreator; + private readonly IServiceProvider _sp; + + public QueueReferenceFactory(IServiceProvider sp, Func defaultCreator = null) + { + _sp = sp ?? throw new ArgumentNullException(nameof(sp)); + + _defaultCreator = defaultCreator ?? (messageType => + { + var topicName = messageType.Name.ToLower(); + var subscriptionName = topicName + ".workers"; + return new QueueReferences(topicName, subscriptionName); + }); + } + + public QueueReferences Create() where TM : IMessage + => _queueReferencesCache.GetOrAdd(typeof(TM), k => CreateCore()); + + private QueueReferences CreateCore() + where TM : IMessage + { + var creator = _sp.GetService>(); + return (creator is null) ? _defaultCreator(typeof(TM)) : creator(); + } + } + + public delegate QueueReferences QueueReferencesPolicy() where TM : IMessage; +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/QueueReferences.cs b/src/OpenSleigh.Transport.AzureServiceBus/QueueReferences.cs new file mode 100644 index 00000000..feeec493 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/QueueReferences.cs @@ -0,0 +1,4 @@ +namespace OpenSleigh.Transport.AzureServiceBus +{ + public record QueueReferences(string TopicName, string SubscriptionName); +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/README.md b/src/OpenSleigh.Transport.AzureServiceBus/README.md new file mode 100644 index 00000000..6baeaa6b --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/README.md @@ -0,0 +1,50 @@ +# OpenSleigh.Transport.AzureServiceBus +![Nuget](https://img.shields.io/nuget/v/OpenSleigh.Transport.AzureServiceBus?style=plastic) + +## Description +Azure Service Bus Transport library for OpenSleigh + +## Installation +This library can be installed from Nuget: https://www.nuget.org/packages/OpenSleigh.Transport.AzureServiceBus/ + +## How-to + +The first thing to do is build an instance of `AzureServiceBusConfiguration` with the connection details. This can be done by reading the current app configuration. Once done, all you have to do is to call the `UseAzureServiceBusTransport` extension method: + +``` +services.AddOpenSleigh(cfg =>{ + var connStr = Configuration.GetConnectionString("AzureServiceBus"); + var config = new AzureServiceBusConfiguration(connStr); + cfg.UseAzureServiceBusTransport(config); + + // register the Persistence and the Sagas +}); +``` + +It is also possible to use a custom naming policy to define the names for topics and subscriptions. This opens the door to multiple, different scenarios (eg. a single topic with multiple subscriptions, one per message). + +``` +services.AddOpenSleigh(cfg =>{ + // code omitted + cfg.UseAzureServiceBusTransport(config, builder => + { + builder.UseMessageNamingPolicy(() => + new QueueReferences("my-topic", "start-parent-saga")); + builder.UseMessageNamingPolicy(() => + new QueueReferences("my-topic", "process-parent-saga")); + }) +}); + +``` + +If your application has to handle messages and events, not just dispatch them, you also have to configure each Saga : + +``` +services.AddOpenSleigh(cfg =>{ + // code omitted // + + cfg.AddSaga() + .UseStateFactory(msg => new MySagaState(msg.CorrelationId)) + .UseAzureServiceBusTransport(); +}); +``` \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusProcessorFactory.cs b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusProcessorFactory.cs new file mode 100644 index 00000000..f6c605b0 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusProcessorFactory.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal class ServiceBusProcessorFactory : IAsyncDisposable, IServiceBusProcessorFactory + { + private readonly IQueueReferenceFactory _queueReferenceFactory; + private readonly ServiceBusClient _serviceBusClient; + private readonly ConcurrentDictionary _processors = new(); + + public ServiceBusProcessorFactory(IQueueReferenceFactory queueReferenceFactory, ServiceBusClient serviceBusClient) + { + _queueReferenceFactory = queueReferenceFactory ?? throw new ArgumentNullException(nameof(queueReferenceFactory)); + _serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient)); + } + + public ServiceBusProcessor Create() where TM : IMessage + { + var references = _queueReferenceFactory.Create(); + + var processor = _processors.GetOrAdd(references, _ => _serviceBusClient.CreateProcessor(references.TopicName, references.SubscriptionName)); + if (processor is null || processor.IsClosed) + processor = _processors[references] = _serviceBusClient.CreateProcessor(references.TopicName, references.SubscriptionName); + + return processor; + } + + public async ValueTask DisposeAsync() + { + foreach (var sender in _processors.Values) + await sender.CloseAsync().ConfigureAwait(false); + _processors.Clear(); + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusPublisher.cs b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusPublisher.cs new file mode 100644 index 00000000..4044b1e3 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusPublisher.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; +using OpenSleigh.Core.Messaging; +using OpenSleigh.Core.Utils; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal class ServiceBusPublisher : IPublisher + { + private readonly IServiceBusSenderFactory _senderFactory; + private readonly ISerializer _serializer; + private readonly ILogger _logger; + + public ServiceBusPublisher(IServiceBusSenderFactory senderFactory, + ISerializer serializer, + ILogger logger) + { + _senderFactory = senderFactory ?? throw new ArgumentNullException(nameof(senderFactory)); + _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) + { + if (message == null) + throw new ArgumentNullException(nameof(message)); + + return PublishAsyncCore(message, cancellationToken); + } + + private async Task PublishAsyncCore(IMessage message, CancellationToken cancellationToken) + { + ServiceBusSender sender = _senderFactory.Create((dynamic) message); + _logger.LogInformation( + $"publishing message '{message.Id}' to {sender.FullyQualifiedNamespace}/{sender.EntityPath}"); + + var serializedMessage = await _serializer.SerializeAsync(message, cancellationToken); + var busMessage = new ServiceBusMessage(serializedMessage) + { + CorrelationId = message.CorrelationId.ToString(), + MessageId = message.Id.ToString(), + ApplicationProperties = + { + {HeaderNames.MessageType, message.GetType().FullName} + } + }; + + await sender.SendMessageAsync(busMessage, cancellationToken).ConfigureAwait(false); + } + } +} diff --git a/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusSenderFactory.cs b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusSenderFactory.cs new file mode 100644 index 00000000..46793ac3 --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusSenderFactory.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Concurrent; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using OpenSleigh.Core.Messaging; + +[assembly: InternalsVisibleTo("OpenSleigh.Transport.AzureServiceBus.Tests")] +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal class ServiceBusSenderFactory : IAsyncDisposable, IServiceBusSenderFactory + { + private readonly IQueueReferenceFactory _queueReferenceFactory; + private readonly ServiceBusClient _serviceBusClient; + private readonly ConcurrentDictionary _senders = new(); + + public ServiceBusSenderFactory(IQueueReferenceFactory queueReferenceFactory, ServiceBusClient serviceBusClient) + { + _queueReferenceFactory = queueReferenceFactory ?? throw new ArgumentNullException(nameof(queueReferenceFactory)); + _serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient)); + } + + public ServiceBusSender Create(TM message = default) where TM : IMessage + { + var references = _queueReferenceFactory.Create(); + + var sender = _senders.GetOrAdd(references, _ => _serviceBusClient.CreateSender(references.TopicName)); + if (sender is null || sender.IsClosed) + sender = _senders[references] = _serviceBusClient.CreateSender(references.TopicName); + + return sender; + } + + public async ValueTask DisposeAsync() + { + foreach (var sender in _senders.Values) + await sender.DisposeAsync(); + _senders.Clear(); + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusSubscriber.cs b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusSubscriber.cs new file mode 100644 index 00000000..06330f9e --- /dev/null +++ b/src/OpenSleigh.Transport.AzureServiceBus/ServiceBusSubscriber.cs @@ -0,0 +1,72 @@ +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; +using OpenSleigh.Core; +using OpenSleigh.Core.Messaging; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace OpenSleigh.Transport.AzureServiceBus +{ + internal sealed class ServiceBusSubscriber : ISubscriber, IDisposable + where TM : IMessage + { + private ServiceBusProcessor _processor; + private readonly IMessageProcessor _messageProcessor; + private readonly IMessageParser _messageParser; + private readonly ILogger> _logger; + + public ServiceBusSubscriber(IServiceBusProcessorFactory processorFactory, + IMessageParser messageParser, + IMessageProcessor messageProcessor, + ILogger> logger) + { + if (processorFactory == null) + throw new ArgumentNullException(nameof(processorFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _messageParser = messageParser ?? throw new ArgumentNullException(nameof(messageParser)); + _messageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor)); + + _processor = processorFactory.Create(); + _processor.ProcessMessageAsync += MessageHandler; + _processor.ProcessErrorAsync += ErrorHandler; + } + + private Task ErrorHandler(ProcessErrorEventArgs args) + { + _logger.LogError(args.Exception, $"an error has occurred while processing messages: {args.Exception.Message}"); + return Task.CompletedTask; + } + + private async Task MessageHandler(ProcessMessageEventArgs args) + { + try + { + var message = _messageParser.Resolve(args.Message.Body); + + await _messageProcessor.ProcessAsync((dynamic)message, args.CancellationToken); + + await args.CompleteMessageAsync(args.Message).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, $"an error has occurred while processing message '{args.Message.MessageId}': {ex.Message}"); + if (args.Message.DeliveryCount > 3) + await args.DeadLetterMessageAsync(args.Message).ConfigureAwait(false); + else + await args.AbandonMessageAsync(args.Message).ConfigureAwait(false); + } + } + + public async Task StartAsync(CancellationToken cancellationToken = default) + => await _processor.StartProcessingAsync(cancellationToken).ConfigureAwait(false); + + public async Task StopAsync(CancellationToken cancellationToken = default) + => await _processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); + + public void Dispose() + { + _processor = null; + } + } +} \ No newline at end of file diff --git a/src/OpenSleigh.Transport.RabbitMQ/RabbitMQSagaConfiguratorExtensions.cs b/src/OpenSleigh.Transport.RabbitMQ/RabbitMQSagaConfiguratorExtensions.cs index 6a80cdf6..30ee404f 100644 --- a/src/OpenSleigh.Transport.RabbitMQ/RabbitMQSagaConfiguratorExtensions.cs +++ b/src/OpenSleigh.Transport.RabbitMQ/RabbitMQSagaConfiguratorExtensions.cs @@ -1,8 +1,7 @@ using System.Diagnostics.CodeAnalysis; -using System.Linq; -using Microsoft.Extensions.DependencyInjection; using OpenSleigh.Core; using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Utils; namespace OpenSleigh.Transport.RabbitMQ { @@ -13,24 +12,11 @@ public static ISagaConfigurator UseRabbitMQTransport(this ISagaC where TS : Saga where TD : SagaState { - var sagaType = typeof(TS); - var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition(); - var interfaces = sagaType.GetInterfaces(); - foreach (var i in interfaces) - { - if (!i.IsGenericType) - continue; - - var openGeneric = i.GetGenericTypeDefinition(); - if (!openGeneric.IsAssignableFrom(messageHandlerType)) - continue; - - var messageType = i.GetGenericArguments().First(); - - sagaConfigurator.Services.AddSingleton(typeof(ISubscriber), + var messageTypes = SagaUtils.GetHandledMessageTypes(); + foreach(var messageType in messageTypes) + sagaConfigurator.Services.AddBusSubscriber( typeof(RabbitSubscriber<>).MakeGenericType(messageType)); - } - + return sagaConfigurator; } } diff --git a/tests/OpenSleigh.Core.Tests/Unit/Utils/SagaUtilsTests.cs b/tests/OpenSleigh.Core.Tests/Unit/Utils/SagaUtilsTests.cs new file mode 100644 index 00000000..92301cf8 --- /dev/null +++ b/tests/OpenSleigh.Core.Tests/Unit/Utils/SagaUtilsTests.cs @@ -0,0 +1,27 @@ +using FluentAssertions; +using OpenSleigh.Core.Tests.Sagas; +using OpenSleigh.Core.Utils; +using Xunit; + +namespace OpenSleigh.Core.Tests.Unit.Utils +{ + public class SagaUtilsTests + { + [Fact] + public void GetHandledMessageTypes_should_return_empty_collection_when_no_messages_handled() + { + var results = SagaUtils.GetHandledMessageTypes(); + results.Should().NotBeNull().And.BeEmpty(); + } + + [Fact] + public void GetHandledMessageTypes_should_return_handled_messages_types() + { + var results = SagaUtils.GetHandledMessageTypes(); + results.Should().NotBeNullOrEmpty().And.HaveCount(2); + results.Should().Contain(new[]{typeof(StartDummySaga), typeof(DummySagaStarted)}); + } + } + + internal class EmptySaga : Saga { } +} diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusEventBroadcastingScenario.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusEventBroadcastingScenario.cs new file mode 100644 index 00000000..4fa7dada --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusEventBroadcastingScenario.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; +using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Tests.E2E; +using OpenSleigh.Core.Tests.Sagas; +using OpenSleigh.Persistence.InMemory; +using OpenSleigh.Transport.AzureServiceBus.Tests.Fixtures; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.E2E +{ + public class ServiceBusEventBroadcastingScenario : EventBroadcastingScenario, IClassFixture, IAsyncLifetime + { + private readonly ServiceBusFixture _fixture; + private readonly string _topicName; + private readonly string _subscriptionName; + + public ServiceBusEventBroadcastingScenario(ServiceBusFixture fixture) + { + _fixture = fixture; + _topicName = $"ServiceBusEventBroadcastingScenario.tests.{Guid.NewGuid()}"; + _subscriptionName = Guid.NewGuid().ToString(); + } + + protected override void ConfigureTransportAndPersistence(IBusConfigurator cfg) + { + cfg.UseAzureServiceBusTransport(_fixture.Configuration, builder => + { + QueueReferencesPolicy policy = () => new QueueReferences(_topicName, _subscriptionName); + builder.UseMessageNamingPolicy(policy); + }) + .UseInMemoryPersistence(); + } + + protected override void ConfigureSagaTransport(ISagaConfigurator cfg) => + cfg.UseAzureServiceBusTransport(); + + + public async Task InitializeAsync() + { + var adminClient = new ServiceBusAdministrationClient(_fixture.Configuration.ConnectionString); + + if (!(await adminClient.TopicExistsAsync(_topicName))) + await adminClient.CreateTopicAsync(_topicName); + + if (!(await adminClient.SubscriptionExistsAsync(_topicName, _subscriptionName))) + await adminClient.CreateSubscriptionAsync(_topicName, _subscriptionName); + } + + public async Task DisposeAsync() + { + var adminClient = new ServiceBusAdministrationClient(_fixture.Configuration.ConnectionString); + + await adminClient.DeleteSubscriptionAsync(_topicName, _subscriptionName); + await adminClient.DeleteTopicAsync(_topicName); + } + } +} \ No newline at end of file diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusParentChildScenario.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusParentChildScenario.cs new file mode 100644 index 00000000..2ee0bba9 --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusParentChildScenario.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; +using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Tests.E2E; +using OpenSleigh.Core.Tests.Sagas; +using OpenSleigh.Persistence.InMemory; +using OpenSleigh.Transport.AzureServiceBus.Tests.Fixtures; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.E2E +{ + public class ServiceBusParentChildScenario : ParentChildScenario, IClassFixture, IAsyncLifetime + { + private readonly ServiceBusFixture _fixture; + private readonly string _topicName; + private readonly Dictionary _subscriptions = new(); + + public ServiceBusParentChildScenario(ServiceBusFixture fixture) + { + _fixture = fixture; + + _topicName = $"ServiceBusParentChildScenario.tests.{Guid.NewGuid()}"; + + _subscriptions[typeof(StartParentSaga)] = Guid.NewGuid().ToString(); + _subscriptions[typeof(ProcessParentSaga)] = Guid.NewGuid().ToString(); + _subscriptions[typeof(ParentSagaCompleted)] = Guid.NewGuid().ToString(); + _subscriptions[typeof(StartChildSaga)] = Guid.NewGuid().ToString(); + _subscriptions[typeof(ProcessChildSaga)] = Guid.NewGuid().ToString(); + _subscriptions[typeof(ChildSagaCompleted)] = Guid.NewGuid().ToString(); + } + + protected override void ConfigureTransportAndPersistence(IBusConfigurator cfg) + { + cfg.UseAzureServiceBusTransport(_fixture.Configuration, builder => + { + builder.UseMessageNamingPolicy(() => + new QueueReferences(_topicName, _subscriptions[typeof(StartParentSaga)])); + builder.UseMessageNamingPolicy(() => + new QueueReferences(_topicName, _subscriptions[typeof(ProcessParentSaga)])); + builder.UseMessageNamingPolicy(() => + new QueueReferences(_topicName, _subscriptions[typeof(ParentSagaCompleted)])); + builder.UseMessageNamingPolicy(() => + new QueueReferences(_topicName, _subscriptions[typeof(StartChildSaga)])); + builder.UseMessageNamingPolicy(() => + new QueueReferences(_topicName, _subscriptions[typeof(ProcessChildSaga)])); + builder.UseMessageNamingPolicy(() => + new QueueReferences(_topicName, _subscriptions[typeof(ChildSagaCompleted)])); + }) + .UseInMemoryPersistence(); + } + + protected override void ConfigureSagaTransport(ISagaConfigurator cfg) => + cfg.UseAzureServiceBusTransport(); + + public async Task InitializeAsync() + { + var adminClient = new ServiceBusAdministrationClient(_fixture.Configuration.ConnectionString); + + if (!(await adminClient.TopicExistsAsync(_topicName))) + await adminClient.CreateTopicAsync(_topicName); + + foreach (var val in _subscriptions.Values) + if (!(await adminClient.SubscriptionExistsAsync(_topicName, val))) + await adminClient.CreateSubscriptionAsync(_topicName, val); + } + + public async Task DisposeAsync() + { + var adminClient = new ServiceBusAdministrationClient(_fixture.Configuration.ConnectionString); + foreach (var val in _subscriptions.Values) + await adminClient.DeleteSubscriptionAsync(_topicName, val); + await adminClient.DeleteTopicAsync(_topicName); + } + } +} diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusSimpleSagaScenario.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusSimpleSagaScenario.cs new file mode 100644 index 00000000..f702d5e0 --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/E2E/ServiceBusSimpleSagaScenario.cs @@ -0,0 +1,62 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Extensions.Hosting; +using OpenSleigh.Core.DependencyInjection; +using OpenSleigh.Core.Tests.E2E; +using OpenSleigh.Core.Tests.Sagas; +using OpenSleigh.Persistence.InMemory; +using OpenSleigh.Transport.AzureServiceBus.Tests.Fixtures; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.E2E +{ + public class ServiceBusSimpleSagaScenario : SimpleSagaScenario, + IClassFixture, + IAsyncLifetime + { + private readonly ServiceBusFixture _fixture; + private readonly string _topicName; + private readonly string _subscriptionName; + + public ServiceBusSimpleSagaScenario(ServiceBusFixture fixture) + { + _fixture = fixture; + + var messageName = nameof(StartSimpleSaga).ToLower(); + _topicName = $"{messageName}.tests.{Guid.NewGuid()}"; + _subscriptionName = $"{messageName}.workers"; + } + + protected override void ConfigureTransportAndPersistence(IBusConfigurator cfg) + { + cfg.UseAzureServiceBusTransport(_fixture.Configuration, builder => + { + QueueReferencesPolicy policy = () => new QueueReferences(_topicName, _subscriptionName); + builder.UseMessageNamingPolicy(policy); + }) + .UseInMemoryPersistence(); + } + + protected override void ConfigureSagaTransport(ISagaConfigurator cfg) => + cfg.UseAzureServiceBusTransport(); + + public async Task InitializeAsync() + { + var adminClient = new ServiceBusAdministrationClient(_fixture.Configuration.ConnectionString); + + if (!(await adminClient.TopicExistsAsync(_topicName))) + await adminClient.CreateTopicAsync(_topicName); + + if (!(await adminClient.SubscriptionExistsAsync(_topicName, _subscriptionName))) + await adminClient.CreateSubscriptionAsync(_topicName, _subscriptionName); + } + + public async Task DisposeAsync() + { + var adminClient = new ServiceBusAdministrationClient(_fixture.Configuration.ConnectionString); + await adminClient.DeleteSubscriptionAsync(_topicName, _subscriptionName); + await adminClient.DeleteTopicAsync(_topicName); + } + } +} diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Fixtures/ServiceBusFixture.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Fixtures/ServiceBusFixture.cs new file mode 100644 index 00000000..f31ea8bd --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Fixtures/ServiceBusFixture.cs @@ -0,0 +1,25 @@ +using System; +using Microsoft.Extensions.Configuration; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.Fixtures +{ + public class ServiceBusFixture + { + public ServiceBusFixture() + { + var configuration = new ConfigurationBuilder() + .AddJsonFile("appsettings.json", optional: false, reloadOnChange: false) + .AddUserSecrets() + .AddEnvironmentVariables() + .Build(); + + var connectionString = configuration.GetConnectionString("AzureServiceBus"); + if (string.IsNullOrWhiteSpace(connectionString)) + throw new ArgumentException("missing Service Bus connection"); + + this.Configuration = new AzureServiceBusConfiguration(connectionString); + } + + public AzureServiceBusConfiguration Configuration { get; init; } + } +} \ No newline at end of file diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/OpenSleigh.Transport.AzureServiceBus.Tests.csproj b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/OpenSleigh.Transport.AzureServiceBus.Tests.csproj new file mode 100644 index 00000000..771364fa --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/OpenSleigh.Transport.AzureServiceBus.Tests.csproj @@ -0,0 +1,49 @@ + + + + net5.0 + + false + + aa412272-6aa3-4522-adfb-989250abf95e + + + + + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + Always + + + + diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/DummyMessage.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/DummyMessage.cs new file mode 100644 index 00000000..77355430 --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/DummyMessage.cs @@ -0,0 +1,10 @@ +using System; +using OpenSleigh.Core.Messaging; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.Unit +{ + public record DummyMessage(Guid Id, Guid CorrelationId) : ICommand + { + public static DummyMessage New() => new DummyMessage(Guid.NewGuid(), Guid.NewGuid()); + } +} \ No newline at end of file diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/MessageParserTests.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/MessageParserTests.cs new file mode 100644 index 00000000..b3fc0041 --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/MessageParserTests.cs @@ -0,0 +1,48 @@ +using System; +using System.Text; +using Azure.Messaging.ServiceBus; +using FluentAssertions; +using NSubstitute; +using OpenSleigh.Core.Utils; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.Unit +{ + public class MessageParserTests + { + [Fact] + public void ctor_should_throw_when_argument_null() + { + Assert.Throws(() => new MessageParser(null)); + } + + [Fact] + public void Resolve_should_throw_when_input_null() + { + var encoder = NSubstitute.Substitute.For(); + var sut = new MessageParser(encoder); + Assert.Throws(() => sut.Resolve(null)); + } + + [Fact] + public void Resolve_should_deserialize_message() + { + var expectedMessage = DummyMessage.New(); + var messageJson = Newtonsoft.Json.JsonConvert.SerializeObject(expectedMessage); + var messageBytes = Encoding.UTF8.GetBytes(messageJson); + var messageData = new BinaryData(messageBytes); + + var encoder = NSubstitute.Substitute.For(); + encoder.Deserialize(messageData, typeof(DummyMessage)) + .Returns(expectedMessage); + + var sut = new MessageParser(encoder); + + var result = sut.Resolve(messageData); + + result.Should().Be(expectedMessage); + encoder.Received(1) + .Deserialize(messageData, typeof(DummyMessage)); + } + } +} \ No newline at end of file diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/QueueReferenceFactoryTests.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/QueueReferenceFactoryTests.cs new file mode 100644 index 00000000..3c36ba4c --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/QueueReferenceFactoryTests.cs @@ -0,0 +1,79 @@ +using System; +using FluentAssertions; +using NSubstitute; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.Unit +{ + public class QueueReferenceFactoryTests + { + [Fact] + public void Create_should_use_default_creator_when_none_defined() + { + var sp = NSubstitute.Substitute.For(); + var sut = new QueueReferenceFactory(sp, messageType => + { + var TopicName = messageType.Name.ToLower(); + var SubscriptionName = TopicName + ".a"; + var dlTopicName = TopicName + ".b"; + var dlSubscriptionName = dlTopicName + ".c"; + return new QueueReferences(TopicName, SubscriptionName); + }); + + var result = sut.Create(); + result.Should().NotBeNull(); + result.TopicName.Should().Be("dummymessage"); + result.SubscriptionName.Should().Be("dummymessage.a"); + } + + [Fact] + public void Create_should_use_registered_creator() + { + var sp = NSubstitute.Substitute.For(); + + var policy = new QueueReferencesPolicy(() => + { + var TopicName = "dummy"; + var SubscriptionName = TopicName + ".a"; + return new QueueReferences(TopicName, SubscriptionName); + }); + sp.GetService(typeof(QueueReferencesPolicy)) + .Returns(policy); + var sut = new QueueReferenceFactory(sp); + + var result = sut.Create(); + result.Should().NotBeNull(); + result.TopicName.Should().Be("dummy"); + result.SubscriptionName.Should().Be("dummy.a"); + } + + [Fact] + public void Create_should_return_valid_references() + { + var sp = NSubstitute.Substitute.For(); + var sut = new QueueReferenceFactory(sp); + + var result = sut.Create(); + result.Should().NotBeNull(); + result.TopicName.Should().Be("dummymessage"); + result.SubscriptionName.Should().Be("dummymessage.workers"); + } + + [Fact] + public void Create_generic_should_return_valid_references() + { + var sp = NSubstitute.Substitute.For(); + var sut = new QueueReferenceFactory(sp); + var result = sut.Create(); + result.Should().NotBeNull(); + result.TopicName.Should().Be("dummymessage"); + result.SubscriptionName.Should().Be("dummymessage.workers"); + } + + [Fact] + public void ctor_should_throw_if_service_provider_null() + { + Assert.Throws(() => new QueueReferenceFactory(null)); + } + } +} diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/ServiceBusProcessorFactoryTests.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/ServiceBusProcessorFactoryTests.cs new file mode 100644 index 00000000..2d1d62b1 --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/ServiceBusProcessorFactoryTests.cs @@ -0,0 +1,67 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using FluentAssertions; +using NSubstitute; +using NSubstitute.ReturnsExtensions; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.Unit +{ + public class ServiceBusProcessorFactoryTests + { + [Fact] + public void ctor_should_throw_when_argument_null() + { + var serviceBusClient = NSubstitute.Substitute.ForPartsOf(); + var factory = NSubstitute.Substitute.For(); + Assert.Throws(() => new ServiceBusProcessorFactory(null, serviceBusClient)); + Assert.Throws(() => new ServiceBusProcessorFactory(factory, null)); + } + + [Fact] + public void Create_should_return_Processor() + { + var serviceBusClient = NSubstitute.Substitute.ForPartsOf(); + var processor = NSubstitute.Substitute.ForPartsOf(); + serviceBusClient.WhenForAnyArgs(c => c.CreateProcessor(Arg.Any(), Arg.Any())) + .DoNotCallBase(); + serviceBusClient.CreateProcessor(Arg.Any(), Arg.Any()) + .ReturnsForAnyArgs(processor); + + var factory = NSubstitute.Substitute.For(); + var references = new QueueReferences("lorem", "ipsum"); + factory.Create() + .Returns(references); + + var sut = new ServiceBusProcessorFactory(factory, serviceBusClient); + var result = sut.Create(); + result.Should().Be(processor); + + serviceBusClient.Received(1) + .CreateProcessor(references.TopicName, references.SubscriptionName); + } + + [Fact] + public void Create_should_recreate_Processor_when_null() + { + var serviceBusClient = NSubstitute.Substitute.ForPartsOf(); + + serviceBusClient.WhenForAnyArgs(c => c.CreateProcessor(Arg.Any(), Arg.Any())) + .DoNotCallBase(); + serviceBusClient.CreateProcessor(Arg.Any(), Arg.Any()) + .ReturnsNullForAnyArgs(); + + var factory = NSubstitute.Substitute.For(); + var references = new QueueReferences("lorem", "ipsum"); + factory.Create() + .Returns(references); + + var sut = new ServiceBusProcessorFactory(factory, serviceBusClient); + sut.Create(); + + serviceBusClient.Received(2) + .CreateProcessor(references.TopicName, references.SubscriptionName); + } + } +} diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/ServiceBusSenderFactoryTests.cs b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/ServiceBusSenderFactoryTests.cs new file mode 100644 index 00000000..d5f4cf5c --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/Unit/ServiceBusSenderFactoryTests.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using FluentAssertions; +using NSubstitute; +using NSubstitute.ReturnsExtensions; +using Xunit; + +namespace OpenSleigh.Transport.AzureServiceBus.Tests.Unit +{ + public class ServiceBusSenderFactoryTests + { + [Fact] + public void ctor_should_throw_when_argument_null() + { + var serviceBusClient = NSubstitute.Substitute.ForPartsOf(); + var factory = NSubstitute.Substitute.For(); + Assert.Throws(() => new ServiceBusSenderFactory(null, serviceBusClient)); + Assert.Throws(() => new ServiceBusSenderFactory(factory, null)); + } + + [Fact] + public void Create_should_return_sender() + { + var serviceBusClient = NSubstitute.Substitute.ForPartsOf(); + var sender = NSubstitute.Substitute.ForPartsOf(); + serviceBusClient.WhenForAnyArgs(c => c.CreateSender(Arg.Any())) + .DoNotCallBase(); + serviceBusClient.CreateSender(Arg.Any()).ReturnsForAnyArgs(sender); + + var factory = NSubstitute.Substitute.For(); + var references = new QueueReferences("lorem", "ipsum"); + factory.Create() + .Returns(references); + + var sut = new ServiceBusSenderFactory(factory, serviceBusClient); + var result = sut.Create(); + result.Should().Be(sender); + + serviceBusClient.Received(1) + .CreateSender(references.TopicName); + } + + [Fact] + public void Create_should_recreate_sender_when_null() + { + var serviceBusClient = NSubstitute.Substitute.ForPartsOf(); + + serviceBusClient.WhenForAnyArgs(c => c.CreateSender(Arg.Any())) + .DoNotCallBase(); + serviceBusClient.CreateSender(Arg.Any()).ReturnsNullForAnyArgs(); + + var factory = NSubstitute.Substitute.For(); + var references = new QueueReferences("lorem", "ipsum"); + factory.Create() + .Returns(references); + + var sut = new ServiceBusSenderFactory(factory, serviceBusClient); + sut.Create(); + + serviceBusClient.Received(2) + .CreateSender(references.TopicName); + } + } +} diff --git a/tests/OpenSleigh.Transport.AzureServiceBus.Tests/appsettings.json b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/appsettings.json new file mode 100644 index 00000000..cc5fe2b5 --- /dev/null +++ b/tests/OpenSleigh.Transport.AzureServiceBus.Tests/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Trace", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + } +}