Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Outbox and Internal Command #16

Merged
merged 1 commit into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ dotnet_diagnostic.S3903.severity = None

#
# MA0004: Use Task.ConfigureAwait(false)
dotnet_diagnostic.MA0004.severity = Suggestion
dotnet_diagnostic.MA0004.severity = None

# MA0049: Type name should not match containing namespace
dotnet_diagnostic.MA0049.severity = Suggestion
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Contracts.EventBus.Messages;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Contracts.EventBus.Messages;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Contracts.EventBus.Messages;

Expand Down
12 changes: 12 additions & 0 deletions src/BuildingBlocks/Core/CQRS/ICommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using MediatR;

namespace BuildingBlocks.Core.CQRS;

public interface ICommand : ICommand<Unit>
{
}

public interface ICommand<out T> : IRequest<T>
where T : notnull
{
}
14 changes: 14 additions & 0 deletions src/BuildingBlocks/Core/CQRS/ICommandHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using MediatR;

namespace BuildingBlocks.Core.CQRS;

public interface ICommandHandler<in TCommand> : ICommandHandler<TCommand, Unit>
where TCommand : ICommand<Unit>
{
}

public interface ICommandHandler<in TCommand, TResponse> : IRequestHandler<TCommand, TResponse>
where TCommand : ICommand<TResponse>
where TResponse : notnull
{
}
8 changes: 8 additions & 0 deletions src/BuildingBlocks/Core/CQRS/IQuery.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using MediatR;

namespace BuildingBlocks.Core.CQRS;

public interface IQuery<out T> : IRequest<T>
where T : notnull
{
}
9 changes: 9 additions & 0 deletions src/BuildingBlocks/Core/CQRS/IQueryHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using MediatR;

namespace BuildingBlocks.Core.CQRS;

public interface IQueryHandler<in TQuery, TResponse> : IRequestHandler<TQuery, TResponse>
where TQuery : IQuery<TResponse>
where TResponse : notnull
{
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Event;
namespace BuildingBlocks.Core.Event;

[Flags]
public enum EventType
Expand Down
6 changes: 6 additions & 0 deletions src/BuildingBlocks/Core/Event/IDomainEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace BuildingBlocks.Core.Event;

public interface IDomainEvent : IEvent
{

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using MassTransit;
using MediatR;

namespace BuildingBlocks.Domain.Event;
namespace BuildingBlocks.Core.Event;

public interface IEvent : INotification
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Event;
namespace BuildingBlocks.Core.Event;

public interface IHaveIntegrationEvent
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using MassTransit;
using MassTransit.Topology;

namespace BuildingBlocks.Domain.Event;
namespace BuildingBlocks.Core.Event;

[ExcludeFromTopology]
public interface IIntegrationEvent : IEvent
Expand Down
10 changes: 10 additions & 0 deletions src/BuildingBlocks/Core/Event/IInternalCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using BuildingBlocks.Core.CQRS;

namespace BuildingBlocks.Core.Event;

public interface IInternalCommand : ICommand
{
long Id { get; }
DateTime OccurredOn { get; }
string Type { get; }
}
13 changes: 13 additions & 0 deletions src/BuildingBlocks/Core/Event/InternalCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using BuildingBlocks.IdsGenerator;
using BuildingBlocks.Utils;

namespace BuildingBlocks.Core.Event;

public class InternalCommand : IInternalCommand
{
public long Id { get; set; } = SnowFlakIdGenerator.NewId();

public DateTime OccurredOn => DateTime.Now;

public string Type { get => TypeProvider.GetTypeName(GetType()); }
}
26 changes: 26 additions & 0 deletions src/BuildingBlocks/Core/Event/MessageEnvelope.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Google.Protobuf;

namespace BuildingBlocks.Core.Event;

public class MessageEnvelope
{
public MessageEnvelope(object? message, IDictionary<string, object?>? headers = null)
{
Message = message;
Headers = headers ?? new Dictionary<string, object?>();
}

public object? Message { get; init; }
public IDictionary<string, object?> Headers { get; init; }
}

public class MessageEnvelope<TMessage> : MessageEnvelope
where TMessage : class, IMessage
{
public MessageEnvelope(TMessage message, IDictionary<string, object?> header) : base(message, header)
{
Message = message;
}

public new TMessage? Message { get; }
}
Original file line number Diff line number Diff line change
@@ -1,88 +1,71 @@
using System.Security.Claims;
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;
using BuildingBlocks.MessageProcessor;
using BuildingBlocks.Web;
using MassTransit;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MessageEnvelope = BuildingBlocks.Core.Event.MessageEnvelope;

namespace BuildingBlocks.Domain;
namespace BuildingBlocks.Core;

public sealed class BusPublisher : IBusPublisher
public sealed class EventDispatcher : IEventDispatcher
{
private readonly IEventMapper _eventMapper;
private readonly ILogger<BusPublisher> _logger;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ILogger<EventDispatcher> _logger;
private readonly IPersistMessageProcessor _persistMessageProcessor;
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IServiceScopeFactory _serviceScopeFactory;

public BusPublisher(IServiceScopeFactory serviceScopeFactory,
public EventDispatcher(IServiceScopeFactory serviceScopeFactory,
IEventMapper eventMapper,
ILogger<BusPublisher> logger,
IPublishEndpoint publishEndpoint,
ILogger<EventDispatcher> logger,
IPersistMessageProcessor persistMessageProcessor,
IHttpContextAccessor httpContextAccessor)
{
_serviceScopeFactory = serviceScopeFactory;
_eventMapper = eventMapper;
_logger = logger;
_publishEndpoint = publishEndpoint;
_persistMessageProcessor = persistMessageProcessor;
_httpContextAccessor = httpContextAccessor;
}

public async Task SendAsync(IDomainEvent domainEvent,
CancellationToken cancellationToken = default) => await SendAsync(new[] { domainEvent }, cancellationToken);
CancellationToken cancellationToken = default) => await SendAsync(new[] {domainEvent}, cancellationToken);

public async Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default)
{
if (domainEvents is null) return;

_logger.LogTrace("Processing integration events start...");

var integrationEvents = await MapDomainEventToIntegrationEventAsync(domainEvents).ConfigureAwait(false);

if (!integrationEvents.Any()) return;
if (integrationEvents.Count == 0) return;

await PublishMessageToBroker(integrationEvents, cancellationToken);

_logger.LogTrace("Processing integration events done...");
foreach (var integrationEvent in integrationEvents)
{
await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvent, SetHeaders()),
cancellationToken);
}
}



public async Task SendAsync(IIntegrationEvent integrationEvent,
CancellationToken cancellationToken = default) => await SendAsync(new[] { integrationEvent }, cancellationToken);
CancellationToken cancellationToken = default) => await SendAsync(new[] {integrationEvent}, cancellationToken);

public async Task SendAsync(IReadOnlyList<IIntegrationEvent> integrationEvents, CancellationToken cancellationToken = default)
public async Task SendAsync(IReadOnlyList<IIntegrationEvent> integrationEvents,
CancellationToken cancellationToken = default)
{
if (integrationEvents is null) return;

_logger.LogTrace("Processing integration events start...");

await PublishMessageToBroker(integrationEvents, cancellationToken);

_logger.LogTrace("Processing integration events done...");
}

private async Task PublishMessageToBroker(IReadOnlyList<IIntegrationEvent> integrationEvents, CancellationToken cancellationToken)
{
foreach (var integrationEvent in integrationEvents)
{
await _publishEndpoint.Publish((object) integrationEvent, context =>
{
context.CorrelationId = _httpContextAccessor?.HttpContext?.GetCorrelationId();
context.Headers.Set("UserId",
_httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier));
context.Headers.Set("UserName",
_httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.Name));
}, cancellationToken);

_logger.LogTrace("Publish a message with ID: {Id}", integrationEvent?.EventId);
}
await _persistMessageProcessor.PublishMessageAsync(new MessageEnvelope(integrationEvents, SetHeaders()),
cancellationToken);
}

private Task<IReadOnlyList<IIntegrationEvent>> MapDomainEventToIntegrationEventAsync(
IReadOnlyList<IDomainEvent> events)
{
_logger.LogTrace("Processing integration events start...");

var wrappedIntegrationEvents = GetWrappedIntegrationEvents(events.ToList())?.ToList();
if (wrappedIntegrationEvents?.Count > 0)
return Task.FromResult<IReadOnlyList<IIntegrationEvent>>(wrappedIntegrationEvents);
Expand All @@ -101,6 +84,8 @@ private Task<IReadOnlyList<IIntegrationEvent>> MapDomainEventToIntegrationEventA
integrationEvents.Add(integrationEvent);
}

_logger.LogTrace("Processing integration events done...");

return Task.FromResult<IReadOnlyList<IIntegrationEvent>>(integrationEvents);
}

Expand All @@ -118,4 +103,14 @@ private IEnumerable<IIntegrationEvent> GetWrappedIntegrationEvents(IReadOnlyList
yield return domainNotificationEvent;
}
}

private IDictionary<string, object> SetHeaders()
{
var headers = new Dictionary<string, object>();
headers.Add("CorrelationId", _httpContextAccessor?.HttpContext?.GetCorrelationId());
headers.Add("UserId", _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.NameIdentifier));
headers.Add("UserName", _httpContextAccessor?.HttpContext?.User?.FindFirstValue(ClaimTypes.Name));

return headers;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Domain;
namespace BuildingBlocks.Core;

public interface IBusPublisher
public interface IEventDispatcher
{
public Task SendAsync(IReadOnlyList<IDomainEvent> domainEvents, CancellationToken cancellationToken = default);
public Task SendAsync(IDomainEvent domainEvent, CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Domain;
namespace BuildingBlocks.Core;

public interface IEventMapper
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Domain;
namespace BuildingBlocks.Core;

public record IntegrationEventWrapper<TDomainEventType>(TDomainEventType DomainEvent) : IIntegrationEvent
where TDomainEventType : IDomainEvent;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Domain.Model
namespace BuildingBlocks.Core.Model
{
public abstract class Aggregate : Aggregate<long>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Model;
namespace BuildingBlocks.Core.Model;

public abstract class Entity : IEntity
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using BuildingBlocks.Domain.Event;
using BuildingBlocks.Core.Event;

namespace BuildingBlocks.Domain.Model;
namespace BuildingBlocks.Core.Model;

public interface IAggregate : IEntity
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace BuildingBlocks.Domain.Model;
namespace BuildingBlocks.Core.Model;

public interface IEntity
{
Expand Down
8 changes: 0 additions & 8 deletions src/BuildingBlocks/Domain/Event/IDomainEvent.cs

This file was deleted.

Loading