Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for Azure Service Bus #25

Merged
merged 17 commits into from
Feb 9, 2021
14 changes: 14 additions & 0 deletions OpenSleigh.sln
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.SQL"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.SQL.Tests", "tests\OpenSleigh.Persistence.SQL.Tests\OpenSleigh.Persistence.SQL.Tests.csproj", "{0799A41D-483D-412C-BD10-E2BB3FB907E4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Transport.AzureServiceBus", "src\OpenSleigh.Transport.AzureServiceBus\OpenSleigh.Transport.AzureServiceBus.csproj", "{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.AzureServiceBus.Tests", "tests\OpenSleigh.Transport.AzureServiceBus.Tests\OpenSleigh.Transport.AzureServiceBus.Tests.csproj", "{483B93A9-3650-4823-ADE2-CA4A1B71EEF6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -73,6 +77,14 @@ Global
{0799A41D-483D-412C-BD10-E2BB3FB907E4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0799A41D-483D-412C-BD10-E2BB3FB907E4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0799A41D-483D-412C-BD10-E2BB3FB907E4}.Release|Any CPU.Build.0 = Release|Any CPU
{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D}.Release|Any CPU.Build.0 = Release|Any CPU
{483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{483B93A9-3650-4823-ADE2-CA4A1B71EEF6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -86,6 +98,8 @@ Global
{2E5F3E44-A3E7-429A-81BB-FC907F0D5377} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
{5B2D9A6B-4ECA-4F73-B654-AEF16485935F} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{0799A41D-483D-412C-BD10-E2BB3FB907E4} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{CA4B0C5D-200A-4D3D-8C5A-D0910E11585D} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
{483B93A9-3650-4823-ADE2-CA4A1B71EEF6} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using OpenSleigh.Core.BackgroundServices;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Core.Utils;
Expand Down Expand Up @@ -41,6 +42,13 @@ public static IServiceCollection AddOpenSleigh(this IServiceCollection services,

return services;
}

public static IServiceCollection AddBusSubscriber(this IServiceCollection services, Type subscriberType)
{
if (!services.Any(s => s.ImplementationType == subscriberType))
services.AddSingleton(typeof(ISubscriber), subscriberType);
return services;
}
}

}
2 changes: 1 addition & 1 deletion src/OpenSleigh.Core/OpenSleigh.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>0.9.2</PackageVersion>
<PackageVersion>0.10.0</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh</Product>
Expand Down
30 changes: 30 additions & 0 deletions src/OpenSleigh.Core/Utils/SagaUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace OpenSleigh.Core.Utils
{
public static class SagaUtils<TS, TD>
where TS : Saga<TD>
where TD : SagaState
{
public static IEnumerable<Type> GetHandledMessageTypes()
{
var sagaType = typeof(TS);
var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition();
var interfaces = sagaType.GetInterfaces();
foreach (var i in interfaces)
{
if (!i.IsGenericType)
continue;

var openGeneric = i.GetGenericTypeDefinition();
if (!openGeneric.IsAssignableFrom(messageHandlerType))
continue;

var messageType = i.GetGenericArguments().First();
yield return messageType;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reflection;
using System.Threading.Channels;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Core.Utils;
using OpenSleigh.Persistence.InMemory.Messaging;

namespace OpenSleigh.Persistence.InMemory
Expand All @@ -14,28 +15,17 @@ namespace OpenSleigh.Persistence.InMemory
public static class InMemorySagaConfiguratorExtensions
{
private static readonly MethodInfo RawRegisterMessageMethod = typeof(InMemorySagaConfiguratorExtensions)
.GetMethod("RegisterMessage", BindingFlags.Static | BindingFlags.NonPublic);
.GetMethod(nameof(RegisterMessage), BindingFlags.Static | BindingFlags.NonPublic);

public static ISagaConfigurator<TS, TD> UseInMemoryTransport<TS, TD>(this ISagaConfigurator<TS, TD> sagaConfigurator)
where TS : Saga<TD>
where TD : SagaState
{
var sagaType = typeof(TS);
var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition();
var interfaces = sagaType.GetInterfaces();
foreach (var i in interfaces)
var messageTypes = SagaUtils<TS, TD>.GetHandledMessageTypes();
foreach (var messageType in messageTypes)
{
if (!i.IsGenericType)
continue;

var openGeneric = i.GetGenericTypeDefinition();
if (!openGeneric.IsAssignableFrom(messageHandlerType))
continue;

var messageType = i.GetGenericArguments().First();

var registerMessageMethod = RawRegisterMessageMethod.MakeGenericMethod(messageType);
registerMessageMethod.Invoke(null, new[] {sagaConfigurator.Services});
registerMessageMethod.Invoke(null, new[] { sagaConfigurator.Services });
}

return sagaConfigurator;
Expand All @@ -55,7 +45,7 @@ private static void RegisterMessage<TM>(IServiceCollection services) where TM :
{
var channel = ctx.GetService<Channel<TM>>();
return channel?.Writer;
}).AddSingleton<ISubscriber, InMemorySubscriber<TM>>();
}).AddBusSubscriber(typeof(InMemorySubscriber<TM>));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;
using Microsoft.Extensions.Azure;

namespace OpenSleigh.Transport.AzureServiceBus
{
[ExcludeFromCodeCoverage]
public record AzureServiceBusConfiguration(string ConnectionString);

[ExcludeFromCodeCoverage]
public static class AzureServiceBusConfiguratorExtensions
{
public static IBusConfigurator UseAzureServiceBusTransport(this IBusConfigurator busConfigurator,
AzureServiceBusConfiguration config,
Action<IAzureServiceBusConfigurationBuilder> builderFunc = null)
{
busConfigurator.Services.AddAzureClients(builder =>
{
builder.AddServiceBusClient(config.ConnectionString);
});

//TODO: evaluate programmatic topics/subscriptions/queues creation based on https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-management-libraries#azuremessagingservicebusadministration

//https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements?tabs=net-standard-sdk-2#reusing-factories-and-clients
busConfigurator.Services
.AddSingleton<IQueueReferenceFactory, QueueReferenceFactory>()
.AddSingleton<IServiceBusSenderFactory, ServiceBusSenderFactory>()
.AddSingleton<IServiceBusProcessorFactory, ServiceBusProcessorFactory>()
.AddSingleton<IMessageParser, MessageParser>()
.AddSingleton<IPublisher, ServiceBusPublisher>();

builderFunc?.Invoke(new DefaultAzureServiceBusConfigurationBuilder(busConfigurator));

return busConfigurator;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Diagnostics.CodeAnalysis;
using OpenSleigh.Core;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Utils;

namespace OpenSleigh.Transport.AzureServiceBus
{
[ExcludeFromCodeCoverage]
public static class AzureServiceBusSagaConfiguratorExtensions
{
public static ISagaConfigurator<TS, TD> UseAzureServiceBusTransport<TS, TD>(this ISagaConfigurator<TS, TD> sagaConfigurator)
where TS : Saga<TD>
where TD : SagaState
{
var messageTypes = SagaUtils<TS, TD>.GetHandledMessageTypes();
foreach (var messageType in messageTypes)
sagaConfigurator.Services.AddBusSubscriber(
typeof(ServiceBusSubscriber<>).MakeGenericType(messageType));

return sagaConfigurator;
}

}
}
7 changes: 7 additions & 0 deletions src/OpenSleigh.Transport.AzureServiceBus/HeaderNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace OpenSleigh.Transport.AzureServiceBus
{
internal static class HeaderNames
{
public const string MessageType = "message-type";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.AzureServiceBus
{
public interface IAzureServiceBusConfigurationBuilder
{
void UseMessageNamingPolicy<TM>(QueueReferencesPolicy<TM> policy) where TM : IMessage;
}

[ExcludeFromCodeCoverage]
internal class DefaultAzureServiceBusConfigurationBuilder : IAzureServiceBusConfigurationBuilder
{
private readonly IBusConfigurator _busConfigurator;

public DefaultAzureServiceBusConfigurationBuilder(IBusConfigurator busConfigurator)
{
_busConfigurator = busConfigurator;
}

public void UseMessageNamingPolicy<TM>(QueueReferencesPolicy<TM> policy) where TM : IMessage
{
if (policy == null)
throw new ArgumentNullException(nameof(policy));

_busConfigurator.Services.AddSingleton(policy);
}
}
}
10 changes: 10 additions & 0 deletions src/OpenSleigh.Transport.AzureServiceBus/IMessageParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.AzureServiceBus
{
internal interface IMessageParser
{
TM Resolve<TM>(BinaryData messageData) where TM : IMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.AzureServiceBus
{
public interface IQueueReferenceFactory
{
QueueReferences Create<TM>() where TM : IMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Azure.Messaging.ServiceBus;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.AzureServiceBus
{
internal interface IServiceBusProcessorFactory
{
ServiceBusProcessor Create<TM>() where TM : IMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Azure.Messaging.ServiceBus;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.AzureServiceBus
{
internal interface IServiceBusSenderFactory
{
ServiceBusSender Create<TM>(TM message = default) where TM : IMessage;
}
}
24 changes: 24 additions & 0 deletions src/OpenSleigh.Transport.AzureServiceBus/MessageParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Core.Utils;

namespace OpenSleigh.Transport.AzureServiceBus
{
internal class MessageParser : IMessageParser
{
private readonly ISerializer _decoder;

public MessageParser(ISerializer encoder)
{
_decoder = encoder ?? throw new ArgumentNullException(nameof(encoder));
}

public TM Resolve<TM>(BinaryData messageData) where TM : IMessage
{
if (messageData is null)
throw new ArgumentNullException(nameof(messageData));

return (TM)_decoder.Deserialize(messageData, typeof(TM));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>0.1.0</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Transport.AzureServiceBus</Product>
<NeutralLanguage>en-US</NeutralLanguage>
<Title>OpenSleigh.Transport.AzureServiceBus</Title>
<PackageDescription>Azure Service Bus transport for OpenSleigh.</PackageDescription>
<Copyright>Copyright 2021</Copyright>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageTags>saga saga-pattern dotnet-core csharp message-queue message-bus saga-state-persistence message-transport azure service-bus</PackageTags>
<PackageOutputPath>../../packages/</PackageOutputPath>
<RepositoryUrl>https://github.com/mizrael/OpenSleigh/</RepositoryUrl>
<PackageProjectUrl>https://github.com/mizrael/OpenSleigh/</PackageProjectUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.0.1" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.0.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\OpenSleigh.Core\OpenSleigh.Core.csproj" />
</ItemGroup>

</Project>
Loading