Skip to content

Commit

Permalink
Consumer for ServiceBus works
Browse files Browse the repository at this point in the history
  • Loading branch information
HubiBoar committed Jul 22, 2024
1 parent cfa4f43 commit 1349900
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 55 deletions.
62 changes: 27 additions & 35 deletions src/FeatureSlice.FluentServiceBus/ServiceBusMessaging.cs
Original file line number Diff line number Diff line change
@@ -1,98 +1,90 @@
using FluentServiceBus;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Momolith.Modules;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Definit.Results;

namespace FeatureSlice.FluentServiceBus;

public sealed class ServiceBusMessaging<TRequest> : IConsumerDispatcher<TRequest>
where TRequest : notnull
public sealed class ServiceBusMessaging : IConsumerDispatcher
{
private IRouterPublisher Publisher { get; set; } = null!;
private readonly IServiceBusBuilder _builder;
private readonly ServiceBusClient _client;
private readonly ServiceBusAdministrationClient _admin;
private readonly List<Func<IHost, Task>> _hostExtensions;
private readonly List<Action> _publisherExtensions;

private ServiceBusMessaging(IServiceBusBuilder builder, IServiceCollection services, IHostExtender extender, ServiceBusClient client, ServiceBusAdministrationClient admin)
private ServiceBusMessaging(IServiceBusBuilder builder, IServiceCollection services, ServiceBusClient client, ServiceBusAdministrationClient admin)
{
_builder = builder;
_client = client;
_admin = admin;
_hostExtensions = [];
extender.ExtendAsync(Build);
services.AddSingleton<IRouterPublisher>(_ => Publisher);
_publisherExtensions = [];
services.AddFeatureSlicesExtension<IHost>((host, provider) => provider.GetRequiredService<Task<IRouterPublisher>>());
services.AddSingleton<Task<IRouterPublisher>>(_ => Build());
}

public static IConsumerSetup Create(
public static void Register(
IServiceBusBuilder builder,
IServiceCollection services,
IHostExtender extender,
ServiceBusClient client,
ServiceBusAdministrationClient admin)
{
return new ServiceBusMessaging(builder, services, extender, client, admin);
services.AddSingleton<IConsumerDispatcher>(new ServiceBusMessaging(builder, services, client, admin));
}

public static IConsumerSetup Create(
public static void Create(
IServiceCollection services,
IHostExtender extender,
ServiceBusClient client,
ServiceBusAdministrationClient admin)
{
return new ServiceBusMessaging(new ServiceBusBuilder(), services, extender, client, admin);
services.AddSingleton<IConsumerDispatcher>(new ServiceBusMessaging(new ServiceBusBuilder(), services, client, admin));
}

public Dispatch<TRequest, Result, Success> GetDispatcher
public Dispatch<TRequest, Result, Success> GetDispatcher<TRequest>
(
ConsumerName consumerName,
IServiceProvider provider,
Dispatch<TRequest, Result, Success> dispatch
)
where TRequest : notnull
{
var queueName = PathConverter.ToQueueName(consumerName.Name);

_hostExtensions.Add(host => {
var consumer = consumerFactory(host.Services);

_publisherExtensions.Add(() => {
_builder
.AddQueue(queueName)
.WithConsumer<TMessage>(message => Consume(message, consumer));

return Task.CompletedTask;
.WithConsumer<TRequest>(Consume);
});

return provider => message => Dispatch(provider, message, queueName);
return Dispatch;

static async Task<Result> Dispatch(IServiceProvider provider, TMessage message, QueueName queueName)
async Task<Result> Dispatch(TRequest message)
{
var publisher = provider.GetRequiredService<IRouterPublisher>();
var publisher = await provider.GetRequiredService<Task<IRouterPublisher>>();
await publisher.Publish(message, queueName.Value);

return Result.Success;
}

static async Task<Result.Or<Abandon>> Consume(TMessage message, IConsumerSetup.Consume<TMessage> consume)
async Task<Result.Or<Abandon>> Consume(TRequest message)
{
var result = await consume(message);
var result = await dispatch(message);

return result.Match<Result.Or<Abandon>>(
success => success,
abandon => new Abandon(),
return result.Match(
success => Result.Or<Abandon>.Success,
error => error);
}
}

private async Task Build(IHost host)
private async Task<IRouterPublisher> Build()
{
foreach(var extension in _hostExtensions)
foreach(var extension in _publisherExtensions)
{
await extension(host);
extension();
}

var built = await _builder.BuildRouterWithStore(_client, _admin);
Publisher = built.Router;
return (await _builder.BuildRouterWithStore(_client, _admin)).Router;
}

}
2 changes: 0 additions & 2 deletions src/FeatureSlice/Endpoint/Endpoint.Builder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public interface IEndpointBuilder
public void Extend(Action<RouteHandlerBuilder> builder);
}

public sealed record EndpointMapper(Action<IEndpointRouteBuilder> Map);

public sealed record EndpointBuilder<TRequest, TResult, TResponse>
(
HttpMethod Method,
Expand Down
19 changes: 5 additions & 14 deletions src/FeatureSlice/Endpoint/Endpoint.Map.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
using Definit.Results;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;

namespace FeatureSlice;

public static class FeatureSliceEndpointExtensions
{
public static void MapFeatureSlices(this IEndpointRouteBuilder endpointRoute)
{
var services = endpointRoute.ServiceProvider.GetServices<EndpointMapper>();

foreach(var service in services)
{
service.Map(endpointRoute);
}
}

public static FeatureSliceBase<TRequest, TResult, TResponse>.ISetup Map<TSelf, TRequest, TResult, TResponse>
(
this FeatureSliceBase<TSelf, TRequest, TResult, TResponse>.HandleSetup options,
Expand All @@ -33,12 +22,12 @@ Func<EndpointBuilder<TRequest, TResult, TResponse>, RouteHandlerBuilder> builder
{
services.AddSwaggerGen(options => options.SetCustomSchemaId());

services.AddSingleton(new EndpointMapper(route =>
services.AddFeatureSlicesExtension<WebApplication>((host, provider) =>
{
var endpoint = new EndpointBuilder<TRequest, TResult, TResponse>
(
method,
route,
host,
provider =>
request =>
provider
Expand All @@ -54,7 +43,9 @@ Func<EndpointBuilder<TRequest, TResult, TResponse>, RouteHandlerBuilder> builder
{
extension(handler);
}
}));

return Task.CompletedTask;
});
});

return options;
Expand Down
34 changes: 34 additions & 0 deletions src/FeatureSlice/Handle/FeatureSlice.Dispatch.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Definit.Results;
using Microsoft.Extensions.Hosting;

namespace FeatureSlice;

Expand Down Expand Up @@ -53,11 +54,44 @@ public sealed record FeatureSliceOptions(IServiceCollection Services);

public static class DispatcherExtensions
{
private sealed record Extension<THost>(Func<THost, IServiceProvider, Task> Run)
where THost : IHost;

public static FeatureSliceOptions AddFeatureSlices(this IServiceCollection services)
{
return new FeatureSliceOptions(services);
}

public static void AddFeatureSlicesExtension<THost>(this IServiceCollection services, Func<THost, IServiceProvider, Task> extension)
where THost : IHost
{
services.AddSingleton(new Extension<THost>(extension));
}

public static async Task MapFeatureSlices<T>(this T host)
where T : IHost
{
await using var scope = host.Services.CreateAsyncScope();

var provider = scope.ServiceProvider;

await Task.WhenAll
(
provider
.GetServices<Extension<T>>()
.Select(x => x.Run(host, provider))
.ToArray()
);

await Task.WhenAll
(
provider
.GetServices<Extension<IHost>>()
.Select(x => x.Run(host, provider))
.ToArray()
);
}

public static FeatureSliceOptions DefaultDispatcher(this FeatureSliceOptions options)
{
options.Services.Add(IDispatcher.RegisterDefault());
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
app.UseSwagger();
app.UseSwaggerUI();

app.MapFeatureSlices();
await app.MapFeatureSlices();

await app.RunAsync();
6 changes: 3 additions & 3 deletions src/Samples/Sample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public sealed record Response(int Value0, int Value1, string Value2);

return new Response(request.Value2, request.Value1, request.Value0);
})
.MapPost("handler", builder => builder
.MapPost("handler", opt => opt
.Request
(
From.Route.Int("id"),
Expand All @@ -31,7 +31,7 @@ public sealed record Response(int Value0, int Value1, string Value2);
.WithTags("Handler"))
.WithJob
(
() => true,
() => DateTime.Now.TimeOfDay == new TimeSpan(10, 0, 0),
() => new ("testjob", 69, 420)
);
}
Expand All @@ -49,7 +49,7 @@ public sealed record Request(string Value0, int Value1);

return Result.Success;
})
.MapPost("consumer", builder => builder
.MapPost("consumer", opt => opt
.Request
(
From.Route.Int("id"),
Expand Down

0 comments on commit 1349900

Please sign in to comment.