From 8a8aa302baa0782cdce4003bf60abfb15abd3f57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rastislav=20Novotn=C3=BD?= Date: Tue, 16 Jul 2024 23:10:24 +0800 Subject: [PATCH 1/4] RabbitMQ support --- Directory.Packages.props | 1 + PlanningPokerCore.sln | 14 + .../Duracellko.PlanningPoker.RabbitMQ.csproj | 34 +++ .../IMessageConverter.cs | 41 +++ .../MessageConverter.cs | 202 +++++++++++++ .../RabbitServiceBus.cs | 277 ++++++++++++++++++ .../RabbitServiceBusLogger.cs | 89 ++++++ .../Resources.Designer.cs | 72 +++++ .../Resources.resx | 123 ++++++++ .../Duracellko.PlanningPoker.Web.csproj | 1 + src/Duracellko.PlanningPoker.Web/Program.cs | 7 +- ...acellko.PlanningPoker.RabbitMQ.Test.csproj | 17 ++ .../MessageConverterTest.cs | 260 ++++++++++++++++ 13 files changed, 1137 insertions(+), 1 deletion(-) create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/Duracellko.PlanningPoker.RabbitMQ.csproj create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/IMessageConverter.cs create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/MessageConverter.cs create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx create mode 100644 test/Duracellko.PlanningPoker.RabbitMQ.Test/Duracellko.PlanningPoker.RabbitMQ.Test.csproj create mode 100644 test/Duracellko.PlanningPoker.RabbitMQ.Test/MessageConverterTest.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index cbc3a61..cb653c4 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -20,6 +20,7 @@ + diff --git a/PlanningPokerCore.sln b/PlanningPokerCore.sln index 81d96b4..6dcae87 100644 --- a/PlanningPokerCore.sln +++ b/PlanningPokerCore.sln @@ -48,6 +48,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ApplicationIntegrationSimul test\ApplicationIntegrationSimulator\README.md = test\ApplicationIntegrationSimulator\README.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Duracellko.PlanningPoker.RabbitMQ", "src\Duracellko.PlanningPoker.RabbitMQ\Duracellko.PlanningPoker.RabbitMQ.csproj", "{811F62DE-C15B-4A90-887C-22F68B17EE2A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Duracellko.PlanningPoker.RabbitMQ.Test", "test\Duracellko.PlanningPoker.RabbitMQ.Test\Duracellko.PlanningPoker.RabbitMQ.Test.csproj", "{9D21ABE1-B43C-4FD7-82D8-A4BA7AAF7991}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -106,6 +110,14 @@ Global {EDD1B4E6-C1DE-45F6-A86A-ABEA17C30CEE}.Debug|Any CPU.Build.0 = Debug|Any CPU {EDD1B4E6-C1DE-45F6-A86A-ABEA17C30CEE}.Release|Any CPU.ActiveCfg = Release|Any CPU {EDD1B4E6-C1DE-45F6-A86A-ABEA17C30CEE}.Release|Any CPU.Build.0 = Release|Any CPU + {811F62DE-C15B-4A90-887C-22F68B17EE2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {811F62DE-C15B-4A90-887C-22F68B17EE2A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {811F62DE-C15B-4A90-887C-22F68B17EE2A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {811F62DE-C15B-4A90-887C-22F68B17EE2A}.Release|Any CPU.Build.0 = Release|Any CPU + {9D21ABE1-B43C-4FD7-82D8-A4BA7AAF7991}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9D21ABE1-B43C-4FD7-82D8-A4BA7AAF7991}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9D21ABE1-B43C-4FD7-82D8-A4BA7AAF7991}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9D21ABE1-B43C-4FD7-82D8-A4BA7AAF7991}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -125,6 +137,8 @@ Global {B5F7A7A3-C1F5-4497-98FE-6659932DDBCF} = {9BE135A3-3D75-40BD-BA92-5961167FD719} {EDD1B4E6-C1DE-45F6-A86A-ABEA17C30CEE} = {6620FD4B-0154-46A5-A929-8326DB18769F} {11DAA9C8-F593-4B97-95DC-25341AC22E98} = {6620FD4B-0154-46A5-A929-8326DB18769F} + {811F62DE-C15B-4A90-887C-22F68B17EE2A} = {9BE135A3-3D75-40BD-BA92-5961167FD719} + {9D21ABE1-B43C-4FD7-82D8-A4BA7AAF7991} = {6620FD4B-0154-46A5-A929-8326DB18769F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C22E21A9-E10B-4324-BD51-350342EA8627} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/Duracellko.PlanningPoker.RabbitMQ.csproj b/src/Duracellko.PlanningPoker.RabbitMQ/Duracellko.PlanningPoker.RabbitMQ.csproj new file mode 100644 index 0000000..3107b81 --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/Duracellko.PlanningPoker.RabbitMQ.csproj @@ -0,0 +1,34 @@ + + + + net8.0 + + + + true + + + + + + + + + + + + + True + True + Resources.resx + + + + + + ResXFileCodeGenerator + Resources.Designer.cs + + + + diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/IMessageConverter.cs b/src/Duracellko.PlanningPoker.RabbitMQ/IMessageConverter.cs new file mode 100644 index 0000000..27403d2 --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/IMessageConverter.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using Duracellko.PlanningPoker.Azure; + +namespace Duracellko.PlanningPoker.RabbitMQ; + +/// +/// When implemented, then object is able to convert messages of type to RabbitMQ message and vice versa. +/// +public interface IMessageConverter +{ + /// + /// Gets headers of RabbitMQ message converted from . + /// + /// The message to convert. + /// Headers of the message. + IDictionary GetMessageHeaders(NodeMessage message); + + /// + /// Gets body of RabbitMQ message converted from . + /// + /// The message to convert. + /// Body of the message. + ReadOnlyMemory GetMessageBody(NodeMessage message); + + /// + /// Converts RabbitMQ message headers and body to object. + /// + /// Headers of the message to convert. + /// Body of the message to convert. + /// Converted message of NodeMessage type. + NodeMessage GetNodeMessage(IDictionary headers, ReadOnlyMemory body); + + /// + /// Gets decoded value of Rabbit MQ message header with specified key. + /// + /// The collection of header key-value pairs. + /// The key to get header value for. + /// Value header with specified key. + string? GetHeader(IDictionary headers, string key); +} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/MessageConverter.cs b/src/Duracellko.PlanningPoker.RabbitMQ/MessageConverter.cs new file mode 100644 index 0000000..06648fe --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/MessageConverter.cs @@ -0,0 +1,202 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using Duracellko.PlanningPoker.Azure; + +namespace Duracellko.PlanningPoker.RabbitMQ; + +/// +/// Instance of this class is able to convert messages of type to RabbitMQ message and vice versa. +/// +public class MessageConverter : IMessageConverter +{ + /// + /// Name of property in Rabbit MQ message holding recipient node ID. + /// + internal const string RecipientIdPropertyName = PropertyPrefix + "RecipientId"; + + /// + /// Name of property in Rabbit MQ message holding sender node ID. + /// + internal const string SenderIdPropertyName = PropertyPrefix + "SenderId"; + + private const string PropertyPrefix = "PlanningPoker-"; + private const string MessageTypePropertyName = PropertyPrefix + "MessageType"; + private const string MessageSubtypePropertyName = PropertyPrefix + "MessageSubtype"; + + private static readonly JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions + { + NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals + }; + + private static readonly Encoding _headersEncoding = Encoding.UTF8; + + /// + /// Gets headers of RabbitMQ message converted from . + /// + /// The message to convert. + /// Headers of the message. + public IDictionary GetMessageHeaders(NodeMessage message) + { + ArgumentNullException.ThrowIfNull(message); + + var headers = new Dictionary(); + SetHeader(headers, MessageTypePropertyName, message.MessageType.ToString()); + if (message.Data != null) + { + SetHeader(headers, MessageSubtypePropertyName, message.Data.GetType().Name); + } + + SetHeader(headers, SenderIdPropertyName, message.SenderNodeId); + SetHeader(headers, RecipientIdPropertyName, message.RecipientNodeId); + + return headers; + } + + /// + /// Gets body of RabbitMQ message converted from . + /// + /// The message to convert. + /// Body of the message. + public ReadOnlyMemory GetMessageBody(NodeMessage message) + { + ArgumentNullException.ThrowIfNull(message); + + if (message.MessageType == NodeMessageType.InitializeTeam || message.MessageType == NodeMessageType.TeamCreated) + { + return ConvertToMessageBody((byte[])message.Data!); + } + else if (message.Data != null) + { + return ConvertToMessageBody(message.Data); + } + else + { + return Array.Empty(); + } + } + + /// + /// Converts RabbitMQ message headers and body to object. + /// + /// Headers of the message to convert. + /// Body of the message to convert. + /// Converted message of NodeMessage type. + public NodeMessage GetNodeMessage(IDictionary headers, ReadOnlyMemory body) + { + ArgumentNullException.ThrowIfNull(headers); + + var messageTypeValue = GetHeader(headers, MessageTypePropertyName); + var messageType = (NodeMessageType)Enum.Parse(typeof(NodeMessageType), messageTypeValue!); + var messageSubtype = GetHeader(headers, MessageSubtypePropertyName); + + var result = new NodeMessage(messageType); + result.SenderNodeId = GetHeader(headers, SenderIdPropertyName); + result.RecipientNodeId = GetHeader(headers, RecipientIdPropertyName); + + switch (result.MessageType) + { + case NodeMessageType.ScrumTeamMessage: + if (string.Equals(messageSubtype, typeof(ScrumTeamMemberMessage).Name, StringComparison.OrdinalIgnoreCase)) + { + result.Data = ConvertFromMessageBody(body); + } + else if (string.Equals(messageSubtype, typeof(ScrumTeamMemberEstimationMessage).Name, StringComparison.OrdinalIgnoreCase)) + { + result.Data = ConvertFromMessageBody(body); + } + else if (string.Equals(messageSubtype, typeof(ScrumTeamEstimationSetMessage).Name, StringComparison.OrdinalIgnoreCase)) + { + result.Data = ConvertFromMessageBody(body); + } + else if (string.Equals(messageSubtype, typeof(ScrumTeamTimerMessage).Name, StringComparison.OrdinalIgnoreCase)) + { + result.Data = ConvertFromMessageBody(body); + } + else + { + result.Data = ConvertFromMessageBody(body); + } + + break; + case NodeMessageType.TeamCreated: + case NodeMessageType.InitializeTeam: + result.Data = ConvertFromMessageBody(body); + break; + case NodeMessageType.TeamList: + case NodeMessageType.RequestTeams: + result.Data = ConvertFromMessageBody(body); + break; + } + + return result; + } + + /// + /// Gets decoded value of Rabbit MQ message header with specified key. + /// + /// The collection of header key-value pairs. + /// The key to get header value for. + /// Value header with specified key. + public string? GetHeader(IDictionary headers, string key) + { + ArgumentNullException.ThrowIfNull(headers); + ArgumentNullException.ThrowIfNullOrEmpty(key); + + if (headers.TryGetValue(key, out var valueObject) && valueObject != null) + { + return _headersEncoding.GetString((byte[])valueObject); + } + + return null; + } + + private static ReadOnlyMemory ConvertToMessageBody(object data) + { + return JsonSerializer.SerializeToUtf8Bytes(data, data.GetType(), _jsonSerializerOptions); + } + + private static ReadOnlyMemory ConvertToMessageBody(byte[] data) + { + using (var dataStream = new MemoryStream()) + { + using (var deflateStream = new DeflateStream(dataStream, CompressionMode.Compress, true)) + { + deflateStream.Write(data, 0, data.Length); + deflateStream.Flush(); + } + + return dataStream.ToArray(); + } + } + + private static T? ConvertFromMessageBody(ReadOnlyMemory body) + { + return JsonSerializer.Deserialize(body.Span, _jsonSerializerOptions); + } + + private static byte[] ConvertFromMessageBody(ReadOnlyMemory body) + { + using (var dataStream = new MemoryStream(body.ToArray())) + { + using (var deflateStream = new DeflateStream(dataStream, CompressionMode.Decompress)) + { + using var memoryStream = new MemoryStream(); + deflateStream.CopyTo(memoryStream); + return memoryStream.ToArray(); + } + } + } + + private static void SetHeader(IDictionary headers, string key, string? value) + { + if (value != null) + { + headers[key] = _headersEncoding.GetBytes(value); + } + } +} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs new file mode 100644 index 0000000..7f00890 --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs @@ -0,0 +1,277 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Reactive.Subjects; +using System.Threading.Tasks; +using Duracellko.PlanningPoker.Azure; +using Duracellko.PlanningPoker.Azure.Configuration; +using Duracellko.PlanningPoker.Azure.ServiceBus; +using Duracellko.PlanningPoker.Domain; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Duracellko.PlanningPoker.RabbitMQ; + +/// +/// Sends and receives messages via service bus using RabbitMQ. +/// +public class RabbitServiceBus : IServiceBus, IDisposable +{ + private const string DefaultExchangeName = "PlanningPoker"; + + private readonly Subject _observableMessages = new Subject(); + private readonly IMessageConverter _messageConverter; + private readonly GuidProvider _guidProvider; + private readonly ILogger _logger; + + private volatile string? _nodeId; + private IConnection? _connection; + private IModel? _receivingChannel; + private string? _exchangeName; + + /// + /// Initializes a new instance of the class. + /// + /// The message converter. + /// The configuration of planning poker for Azure platform. + /// The GUID provider to provide new GUID objects. + /// Logger instance to log events. + public RabbitServiceBus( + IMessageConverter messageConverter, + IAzurePlanningPokerConfiguration configuration, + GuidProvider? guidProvider, + ILogger logger) + { + _messageConverter = messageConverter ?? throw new ArgumentNullException(nameof(messageConverter)); + Configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); + _guidProvider = guidProvider ?? GuidProvider.Default; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Gets a configuration of planning poker for Azure platform. + /// + public IAzurePlanningPokerConfiguration Configuration { get; private set; } + + /// + /// Gets an observable object receiving messages from service bus. + /// + public IObservable ObservableMessages => _observableMessages; + + /// + /// Sends a message to RabbitMQ exchange. + /// + /// The message to send. + /// A representing the asynchronous operation. + public async Task SendMessage(NodeMessage message) + { + ArgumentNullException.ThrowIfNull(message); + + await Task.Run(() => SendMessageInternal(message)); + } + + /// + /// Register for receiving messages from other nodes. + /// + /// Current node ID. + /// A representing the asynchronous operation. + public Task Register(string nodeId) + { + ArgumentNullException.ThrowIfNullOrEmpty(nodeId); + + _nodeId = nodeId; + _connection = CreateConnectionFactory().CreateConnection(); + _connection.CallbackException += ConnectionOnCallbackException; + + InitializeExchangeName(); + var queueName = InitializeTopology(); + + var receivingChannel = _receivingChannel!; + var consumer = new EventingBasicConsumer(receivingChannel); + consumer.Received += ConsumerOnReceived; + receivingChannel.BasicConsume(queueName, true, consumer); + _logger.QueueCreated(_exchangeName, nodeId); + return Task.CompletedTask; + } + + /// + /// Stop receiving messages from other nodes. + /// + /// A representing the asynchronous operation. + public Task Unregister() + { + if (_receivingChannel != null) + { + _receivingChannel.Close(); + _receivingChannel.Dispose(); + _receivingChannel = null; + } + + if (_connection != null) + { + _connection.Close(); + _connection.CallbackException -= ConnectionOnCallbackException; + _connection.Dispose(); + _connection = null; + } + + if (!_observableMessages.IsDisposed) + { + _observableMessages.OnCompleted(); + _observableMessages.Dispose(); + } + + if (_nodeId != null) + { + _logger.QueueClosed(_exchangeName, _nodeId); + _nodeId = null; + } + + return Task.CompletedTask; + } + + /// + /// Releases all resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases all unmanaged and optionally managed resources. + /// + /// True if disposing not using GC; otherwise false. + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + Unregister().Wait(); + } + } + + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Log error.")] + private void SendMessageInternal(NodeMessage message) + { + var connection = _connection; + if (connection == null) + { + throw new InvalidOperationException(Resources.Error_RabbitMQNotInitialized); + } + + IModel? sendingChannel = null; + try + { + sendingChannel = connection.CreateModel(); + + var properties = CreateBasicProperties(message, sendingChannel); + properties.Headers = _messageConverter.GetMessageHeaders(message); + var body = _messageConverter.GetMessageBody(message); + + sendingChannel.BasicPublish(_exchangeName, string.Empty, properties, body); + _logger.SendMessage(properties.MessageId); + } + catch (Exception ex) + { + _logger.ErrorSendMessage(ex); + } + finally + { + sendingChannel?.Close(); + sendingChannel?.Dispose(); + } + } + + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Log error and try again.")] + private void ConsumerOnReceived(object? sender, BasicDeliverEventArgs e) + { + var nodeId = _nodeId; + if (nodeId != null) + { + var headers = e.BasicProperties.Headers; + var senderId = _messageConverter.GetHeader(headers, MessageConverter.SenderIdPropertyName); + var recipientId = _messageConverter.GetHeader(headers, MessageConverter.RecipientIdPropertyName); + + var messageId = e.BasicProperties.MessageId; + _logger.MessageReceived(_exchangeName, nodeId, messageId); + + if (senderId == nodeId || (recipientId != null && recipientId != nodeId)) + { + return; + } + + try + { + var nodeMessage = _messageConverter.GetNodeMessage(headers, e.Body); + _observableMessages.OnNext(nodeMessage); + _logger.MessageProcessed(_exchangeName, nodeId, messageId); + } + catch (Exception ex) + { + _logger.ErrorProcessMessage(ex, _exchangeName, nodeId, messageId); + } + } + } + + private void ConnectionOnCallbackException(object? sender, CallbackExceptionEventArgs e) + { + _logger.ConnectionCallbackError(e.Exception, _exchangeName, _nodeId); + } + + private ConnectionFactory CreateConnectionFactory() + { + var uri = Configuration.ServiceBusConnectionString!; + if (uri.StartsWith("RABBITMQ:", StringComparison.Ordinal)) + { + uri = uri.Substring(9); + } + + return new ConnectionFactory() + { + Uri = new Uri(uri) + }; + } + + private void InitializeExchangeName() + { + _exchangeName = Configuration.ServiceBusTopic; + if (string.IsNullOrEmpty(_exchangeName)) + { + _exchangeName = DefaultExchangeName; + } + } + + private string InitializeTopology() + { + var receivingChannel = _connection!.CreateModel(); + receivingChannel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout); + + var queueParameters = new Dictionary() + { + { "message-ttl", 60000 } + }; + var queue = receivingChannel.QueueDeclare(arguments: queueParameters); + var queueName = queue.QueueName; + receivingChannel.QueueBind(queueName, _exchangeName, string.Empty); + + _receivingChannel = receivingChannel; + return queueName; + } + + private IBasicProperties CreateBasicProperties(NodeMessage message, IModel model) + { + var properties = model.CreateBasicProperties(); + + properties.MessageId = _guidProvider.NewGuid().ToString(); + properties.Type = message.MessageType.ToString(); + + if (message.Data != null) + { + properties.ContentType = message.Data is byte[] ? "application/octet-stream" : "application/json"; + } + + return properties; + } +} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs new file mode 100644 index 0000000..13ca5be --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs @@ -0,0 +1,89 @@ +using System; +using Microsoft.Extensions.Logging; + +namespace Duracellko.PlanningPoker.RabbitMQ; + +internal static class RabbitServiceBusLogger +{ + private const int BaseEventId = 1750; + + private static readonly Action _sendMessage = LoggerMessage.Define( + LogLevel.Debug, + new EventId(BaseEventId + 1, nameof(SendMessage)), + "Message sent to RabbitMQ. MessageID: {MessageId}"); + + private static readonly Action _errorSendMessage = LoggerMessage.Define( + LogLevel.Error, + new EventId(BaseEventId + 2, nameof(ErrorSendMessage)), + "Error sending message to RabbitMQ."); + + private static readonly Action _messageReceived = LoggerMessage.Define( + LogLevel.Debug, + new EventId(BaseEventId + 3, nameof(MessageReceived)), + "RabbitMQ message was received (Channel: {Channel}, NodeID: {NodeId}, MessageID: {MessageId})"); + + private static readonly Action _messageProcessed = LoggerMessage.Define( + LogLevel.Information, + new EventId(BaseEventId + 4, nameof(MessageProcessed)), + "RabbitMQ message was processed (Channel: {Channel}, NodeID: {NodeId}, MessageID: {MessageId})"); + + private static readonly Action _errorProcessMessage = LoggerMessage.Define( + LogLevel.Error, + new EventId(BaseEventId + 5, nameof(ErrorProcessMessage)), + "RabbitMQ message processing failed (Channel: {Channel}, NodeID: {NodeId}, MessageID: {MessageId})"); + + private static readonly Action _queueCreated = LoggerMessage.Define( + LogLevel.Debug, + new EventId(BaseEventId + 6, nameof(QueueCreated)), + "RabbitMQ queue was created (Channel: {Channel}, NodeID: {NodeId})"); + + private static readonly Action _queueClosed = LoggerMessage.Define( + LogLevel.Debug, + new EventId(BaseEventId + 7, nameof(QueueClosed)), + "RabbitMQ queue was closed (Channel: {Channel}, NodeID: {NodeId})"); + + private static readonly Action _connectionCallbackError = LoggerMessage.Define( + LogLevel.Error, + new EventId(BaseEventId + 8, nameof(ConnectionCallbackError)), + "RabbitMQ connection callback failed (Channel: {Channel}, NodeID: {NodeId})"); + + public static void SendMessage(this ILogger logger, string? messageId) + { + _sendMessage(logger, messageId, null); + } + + public static void ErrorSendMessage(this ILogger logger, Exception exception) + { + _errorSendMessage(logger, exception); + } + + public static void MessageReceived(this ILogger logger, string? channel, string? nodeId, string? messageId) + { + _messageReceived(logger, channel, nodeId, messageId, null); + } + + public static void MessageProcessed(this ILogger logger, string? channel, string? nodeId, string? messageId) + { + _messageProcessed(logger, channel, nodeId, messageId, null); + } + + public static void ErrorProcessMessage(this ILogger logger, Exception exception, string? channel, string? nodeId, string? messageId) + { + _errorProcessMessage(logger, channel, nodeId, messageId, exception); + } + + public static void QueueCreated(this ILogger logger, string? channel, string? nodeId) + { + _queueCreated(logger, channel, nodeId, null); + } + + public static void QueueClosed(this ILogger logger, string? channel, string? nodeId) + { + _queueClosed(logger, channel, nodeId, null); + } + + public static void ConnectionCallbackError(this ILogger logger, Exception exception, string? channel, string? nodeId) + { + _connectionCallbackError(logger, channel, nodeId, exception); + } +} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs new file mode 100644 index 0000000..f908d4b --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs @@ -0,0 +1,72 @@ +//------------------------------------------------------------------------------ +// +// This code was generated by a tool. +// Runtime Version:4.0.30319.42000 +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// +//------------------------------------------------------------------------------ + +namespace Duracellko.PlanningPoker.RabbitMQ { + using System; + + + /// + /// A strongly-typed resource class, for looking up localized strings, etc. + /// + // This class was auto-generated by the StronglyTypedResourceBuilder + // class via a tool like ResGen or Visual Studio. + // To add or remove a member, edit your .ResX file then rerun ResGen + // with the /str option, or rebuild your VS project. + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "17.0.0.0")] + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] + internal class Resources { + + private static global::System.Resources.ResourceManager resourceMan; + + private static global::System.Globalization.CultureInfo resourceCulture; + + [global::System.Diagnostics.CodeAnalysis.SuppressMessageAttribute("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] + internal Resources() { + } + + /// + /// Returns the cached ResourceManager instance used by this class. + /// + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Resources.ResourceManager ResourceManager { + get { + if (object.ReferenceEquals(resourceMan, null)) { + global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("Duracellko.PlanningPoker.RabbitMQ.Resources", typeof(Resources).Assembly); + resourceMan = temp; + } + return resourceMan; + } + } + + /// + /// Overrides the current thread's CurrentUICulture property for all + /// resource lookups using this strongly typed resource class. + /// + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Globalization.CultureInfo Culture { + get { + return resourceCulture; + } + set { + resourceCulture = value; + } + } + + /// + /// Looks up a localized string similar to RabbitMQ is not initialized.. + /// + internal static string Error_RabbitMQNotInitialized { + get { + return ResourceManager.GetString("Error_RabbitMQNotInitialized", resourceCulture); + } + } + } +} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx new file mode 100644 index 0000000..b011173 --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx @@ -0,0 +1,123 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + RabbitMQ is not initialized. + + \ No newline at end of file diff --git a/src/Duracellko.PlanningPoker.Web/Duracellko.PlanningPoker.Web.csproj b/src/Duracellko.PlanningPoker.Web/Duracellko.PlanningPoker.Web.csproj index beb5ce8..ee7aede 100644 --- a/src/Duracellko.PlanningPoker.Web/Duracellko.PlanningPoker.Web.csproj +++ b/src/Duracellko.PlanningPoker.Web/Duracellko.PlanningPoker.Web.csproj @@ -21,6 +21,7 @@ + diff --git a/src/Duracellko.PlanningPoker.Web/Program.cs b/src/Duracellko.PlanningPoker.Web/Program.cs index 27c5b47..d4ce3e4 100644 --- a/src/Duracellko.PlanningPoker.Web/Program.cs +++ b/src/Duracellko.PlanningPoker.Web/Program.cs @@ -98,9 +98,14 @@ private static void ConfigureServices(IServiceCollection services, IConfiguratio services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); - if (planningPokerConfiguration.ServiceBusConnectionString!.StartsWith("REDIS:", StringComparison.Ordinal)) + if (planningPokerConfiguration.ServiceBusConnectionString!.StartsWith("RABBITMQ:", StringComparison.Ordinal)) + { + services.AddSingleton(); + } + else if (planningPokerConfiguration.ServiceBusConnectionString.StartsWith("REDIS:", StringComparison.Ordinal)) { services.AddSingleton(); healthChecks.AddCheck("Redis"); diff --git a/test/Duracellko.PlanningPoker.RabbitMQ.Test/Duracellko.PlanningPoker.RabbitMQ.Test.csproj b/test/Duracellko.PlanningPoker.RabbitMQ.Test/Duracellko.PlanningPoker.RabbitMQ.Test.csproj new file mode 100644 index 0000000..1ddfd12 --- /dev/null +++ b/test/Duracellko.PlanningPoker.RabbitMQ.Test/Duracellko.PlanningPoker.RabbitMQ.Test.csproj @@ -0,0 +1,17 @@ + + + + net8.0 + + + + + + + + + + + + + diff --git a/test/Duracellko.PlanningPoker.RabbitMQ.Test/MessageConverterTest.cs b/test/Duracellko.PlanningPoker.RabbitMQ.Test/MessageConverterTest.cs new file mode 100644 index 0000000..573ae0f --- /dev/null +++ b/test/Duracellko.PlanningPoker.RabbitMQ.Test/MessageConverterTest.cs @@ -0,0 +1,260 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using Duracellko.PlanningPoker.Azure; +using Duracellko.PlanningPoker.Domain; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Duracellko.PlanningPoker.RabbitMQ.Test; + +[TestClass] +public class MessageConverterTest +{ + private const string SenderId = "3d1c7636-ae1d-4288-b1e1-0dccc8989722"; + private const string RecipientId = "10243241-802e-4d66-b4fc-55c76c23bcb2"; + private const string TeamName = "My Team"; + private const string Team1Json = "{\"Name\":\"My Team\",\"State\":1,\"AvailableEstimations\":[{\"Value\":0.0},{\"Value\":0.5},{\"Value\":1.0},{\"Value\":2.0},{\"Value\":3.0},{\"Value\":5.0},{\"Value\":8.0},{\"Value\":13.0},{\"Value\":20.0},{\"Value\":40.0},{\"Value\":100.0},{\"Value\":\"Infinity\"},{\"Value\":null}],\"Members\":[{\"Name\":\"Duracellko\",\"MemberType\":2,\"Messages\":[],\"LastMessageId\":3,\"LastActivity\":\"2020-05-24T14:46:48.1509407Z\",\"IsDormant\":false,\"Estimation\":null},{\"Name\":\"Me\",\"MemberType\":1,\"Messages\":[],\"LastMessageId\":2,\"LastActivity\":\"2020-05-24T14:47:40.119354Z\",\"IsDormant\":false,\"Estimation\":{\"Value\":20.0}}],\"EstimationResult\":{\"Duracellko\":null,\"Me\":{\"Value\":20.0}}}"; + private const string Team2Json = "{\"Name\":\"My Team\",\"State\":1,\"AvailableEstimations\":[{\"Value\":0.0},{\"Value\":0.5},{\"Value\":1.0},{\"Value\":2.0},{\"Value\":3.0},{\"Value\":5.0},{\"Value\":8.0},{\"Value\":13.0},{\"Value\":20.0},{\"Value\":40.0},{\"Value\":100.0},{\"Value\":\"Infinity\"},{\"Value\":null}],\"Members\":[{\"Name\":\"Duracellko\",\"MemberType\":2,\"Messages\":[],\"LastMessageId\":9,\"LastActivity\":\"2020-05-24T14:53:07.6381166Z\",\"IsDormant\":false,\"Estimation\":{\"Value\":2.0}},{\"Name\":\"Me\",\"MemberType\":1,\"Messages\":[],\"LastMessageId\":8,\"LastActivity\":\"2020-05-24T14:53:05.8193334Z\",\"IsDormant\":false,\"Estimation\":{\"Value\":5.0}},{\"Name\":\"Test\",\"MemberType\":1,\"Messages\":[{\"Id\":4,\"MessageType\":6,\"MemberName\":\"Duracellko\",\"EstimationResult\":null},{\"Id\":5,\"MessageType\":6,\"MemberName\":\"Me\",\"EstimationResult\":null}],\"LastMessageId\":5,\"LastActivity\":\"2020-05-24T14:52:40.0708949Z\",\"IsDormant\":false,\"Estimation\":null}],\"EstimationResult\":{\"Duracellko\":{\"Value\":2.0},\"Me\":{\"Value\":5.0},\"Test\":null}}"; + + [TestMethod] + public void GetHeaders_Null_ArgumentNullException() + { + var target = new MessageConverter(); + Assert.ThrowsException(() => target.GetMessageHeaders(null!)); + } + + [TestMethod] + public void GetMessageBody_Null_ArgumentNullException() + { + var target = new MessageConverter(); + Assert.ThrowsException(() => target.GetMessageBody(null!)); + } + + [TestMethod] + public void GetNodeMessage_Null_ArgumentNullException() + { + var target = new MessageConverter(); + Assert.ThrowsException(() => target.GetNodeMessage(null!, null)); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_ScrumTeamMessage() + { + var scrumTeamMessage = new ScrumTeamMessage(TeamName, MessageType.EstimationStarted); + var nodeMessage = new NodeMessage(NodeMessageType.ScrumTeamMessage) + { + SenderNodeId = SenderId, + Data = scrumTeamMessage + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + var resultData = (ScrumTeamMessage)result.Data!; + + Assert.AreEqual(MessageType.EstimationStarted, resultData.MessageType); + Assert.AreEqual(TeamName, resultData.TeamName); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_ScrumTeamMemberMessage() + { + var scrumTeamMessage = new ScrumTeamMemberMessage(TeamName, MessageType.MemberJoined) + { + MemberType = "Observer", + MemberName = "Test person", + SessionId = Guid.NewGuid() + }; + var nodeMessage = new NodeMessage(NodeMessageType.ScrumTeamMessage) + { + SenderNodeId = SenderId, + Data = scrumTeamMessage + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + var resultData = (ScrumTeamMemberMessage)result.Data!; + + Assert.AreEqual(MessageType.MemberJoined, resultData.MessageType); + Assert.AreEqual(TeamName, resultData.TeamName); + Assert.AreEqual(scrumTeamMessage.MemberType, resultData.MemberType); + Assert.AreEqual(scrumTeamMessage.MemberName, resultData.MemberName); + Assert.AreEqual(scrumTeamMessage.SessionId, resultData.SessionId); + } + + [DataTestMethod] + [DataRow(8.0)] + [DataRow(0.5)] + [DataRow(0.0)] + [DataRow(null)] + [DataRow(double.PositiveInfinity)] + public void ConvertToRabbitMQMessageAndBack_ScrumTeamMemberEstimationMessage(double? estimation) + { + var scrumTeamMessage = new ScrumTeamMemberEstimationMessage(TeamName, MessageType.MemberEstimated) + { + MemberName = "Scrum Master", + Estimation = estimation + }; + var nodeMessage = new NodeMessage(NodeMessageType.ScrumTeamMessage) + { + SenderNodeId = SenderId, + Data = scrumTeamMessage + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + var resultData = (ScrumTeamMemberEstimationMessage)result.Data!; + + Assert.AreEqual(MessageType.MemberEstimated, resultData.MessageType); + Assert.AreEqual(TeamName, resultData.TeamName); + Assert.AreEqual(scrumTeamMessage.MemberName, resultData.MemberName); + Assert.AreEqual(scrumTeamMessage.Estimation, resultData.Estimation); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_ScrumTeamEstimationSetMessage() + { + var deck = DeckProvider.Default.GetDefaultDeck().Select(e => e.Value).ToList(); + var scrumTeamMessage = new ScrumTeamEstimationSetMessage(TeamName, MessageType.AvailableEstimationsChanged) + { + Estimations = deck + }; + var nodeMessage = new NodeMessage(NodeMessageType.ScrumTeamMessage) + { + SenderNodeId = SenderId, + Data = scrumTeamMessage + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + var resultData = (ScrumTeamEstimationSetMessage)result.Data!; + + Assert.AreEqual(MessageType.AvailableEstimationsChanged, resultData.MessageType); + Assert.AreEqual(TeamName, resultData.TeamName); + CollectionAssert.AreEqual(deck, resultData.Estimations.ToList()); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_ScrumTeamTimerMessage() + { + var scrumTeamMessage = new ScrumTeamTimerMessage(TeamName, MessageType.TimerStarted) + { + EndTime = new DateTime(2021, 11, 16, 23, 49, 31, DateTimeKind.Utc) + }; + var nodeMessage = new NodeMessage(NodeMessageType.ScrumTeamMessage) + { + SenderNodeId = SenderId, + Data = scrumTeamMessage + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + var resultData = (ScrumTeamTimerMessage)result.Data!; + + Assert.AreEqual(MessageType.TimerStarted, resultData.MessageType); + Assert.AreEqual(TeamName, resultData.TeamName); + Assert.AreEqual(scrumTeamMessage.EndTime, resultData.EndTime); + Assert.AreEqual(DateTimeKind.Utc, resultData.EndTime.Kind); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_TeamCreated() + { + var nodeMessage = new NodeMessage(NodeMessageType.TeamCreated) + { + SenderNodeId = SenderId, + Data = Encoding.UTF8.GetBytes(Team1Json) + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + + var resultJson = Encoding.UTF8.GetString((byte[])result.Data!); + Assert.AreEqual(Team1Json, resultJson); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_RequestTeamList() + { + var nodeMessage = new NodeMessage(NodeMessageType.RequestTeamList) + { + SenderNodeId = SenderId + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + + Assert.IsNotNull(result); + } + + [TestMethod] + [SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1011:Closing square brackets should be spaced correctly", Justification = "Nullable array")] + public void ConvertToRabbitMQMessageAndBack_TeamList() + { + var teamList = new[] { TeamName, "Test", "Hello, World!" }; + var nodeMessage = new NodeMessage(NodeMessageType.TeamList) + { + SenderNodeId = SenderId, + RecipientNodeId = RecipientId, + Data = teamList + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + + CollectionAssert.AreEqual(teamList, (string[]?)result.Data); + } + + [TestMethod] + [SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1011:Closing square brackets should be spaced correctly", Justification = "Nullable array")] + public void ConvertToRabbitMQMessageAndBack_RequestTeams() + { + var teamList = new[] { TeamName }; + var nodeMessage = new NodeMessage(NodeMessageType.RequestTeams) + { + SenderNodeId = SenderId, + RecipientNodeId = RecipientId, + Data = teamList + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + + CollectionAssert.AreEqual(teamList, (string[]?)result.Data); + } + + [TestMethod] + public void ConvertToRabbitMQMessageAndBack_InitializeTeam() + { + var nodeMessage = new NodeMessage(NodeMessageType.InitializeTeam) + { + SenderNodeId = SenderId, + RecipientNodeId = RecipientId, + Data = Encoding.UTF8.GetBytes(Team2Json) + }; + + var result = ConvertToRabbitMQMessageAndBack(nodeMessage); + + var resultJson = Encoding.UTF8.GetString((byte[])result.Data!); + Assert.AreEqual(Team2Json, resultJson); + } + + private static NodeMessage ConvertToRabbitMQMessageAndBack(NodeMessage nodeMessage) + { + var target = new MessageConverter(); + + var headers = target.GetMessageHeaders(nodeMessage); + var body = target.GetMessageBody(nodeMessage); + + var result = target.GetNodeMessage(headers, body); + + Assert.IsNotNull(result); + Assert.AreNotSame(nodeMessage, result); + Assert.AreEqual(nodeMessage.MessageType, result.MessageType); + Assert.AreEqual(nodeMessage.SenderNodeId, result.SenderNodeId); + Assert.AreEqual(nodeMessage.RecipientNodeId, result.RecipientNodeId); + + if (nodeMessage.Data == null) + { + Assert.IsNull(result.Data); + } + else + { + Assert.IsNotNull(result.Data); + Assert.AreEqual(nodeMessage.Data.GetType(), result.Data.GetType()); + } + + return result; + } +} From 6e4285c970d46cf7acc030bf7be2219affe39e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rastislav=20Novotn=C3=BD?= Date: Tue, 23 Jul 2024 10:37:09 +0800 Subject: [PATCH 2/4] Acknowledge RabbitMQ message --- .../RabbitServiceBus.cs | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs index 7f00890..564b7af 100644 --- a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs +++ b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs @@ -20,6 +20,8 @@ public class RabbitServiceBus : IServiceBus, IDisposable { private const string DefaultExchangeName = "PlanningPoker"; + private static readonly TimeSpan PublishTimeout = TimeSpan.FromSeconds(5); + private readonly Subject _observableMessages = new Subject(); private readonly IMessageConverter _messageConverter; private readonly GuidProvider _guidProvider; @@ -68,7 +70,11 @@ public async Task SendMessage(NodeMessage message) { ArgumentNullException.ThrowIfNull(message); - await Task.Run(() => SendMessageInternal(message)); + var shouldRetry = await Task.Run(() => SendMessage(message, false)); + if (shouldRetry) + { + await Task.Run(() => SendMessage(message, true)); + } } /// @@ -90,7 +96,7 @@ public Task Register(string nodeId) var receivingChannel = _receivingChannel!; var consumer = new EventingBasicConsumer(receivingChannel); consumer.Received += ConsumerOnReceived; - receivingChannel.BasicConsume(queueName, true, consumer); + receivingChannel.BasicConsume(queueName, false, consumer); _logger.QueueCreated(_exchangeName, nodeId); return Task.CompletedTask; } @@ -153,7 +159,7 @@ protected virtual void Dispose(bool disposing) } [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Log error.")] - private void SendMessageInternal(NodeMessage message) + private bool SendMessage(NodeMessage message, bool isRetry) { var connection = _connection; if (connection == null) @@ -165,17 +171,25 @@ private void SendMessageInternal(NodeMessage message) try { sendingChannel = connection.CreateModel(); + sendingChannel.ConfirmSelect(); var properties = CreateBasicProperties(message, sendingChannel); properties.Headers = _messageConverter.GetMessageHeaders(message); var body = _messageConverter.GetMessageBody(message); sendingChannel.BasicPublish(_exchangeName, string.Empty, properties, body); + sendingChannel.WaitForConfirmsOrDie(PublishTimeout); _logger.SendMessage(properties.MessageId); + return false; } catch (Exception ex) { - _logger.ErrorSendMessage(ex); + if (isRetry) + { + _logger.ErrorSendMessage(ex); + } + + return !isRetry; } finally { @@ -188,7 +202,8 @@ private void SendMessageInternal(NodeMessage message) private void ConsumerOnReceived(object? sender, BasicDeliverEventArgs e) { var nodeId = _nodeId; - if (nodeId != null) + var receivingChannel = _receivingChannel; + if (nodeId != null && receivingChannel != null) { var headers = e.BasicProperties.Headers; var senderId = _messageConverter.GetHeader(headers, MessageConverter.SenderIdPropertyName); @@ -206,10 +221,12 @@ private void ConsumerOnReceived(object? sender, BasicDeliverEventArgs e) { var nodeMessage = _messageConverter.GetNodeMessage(headers, e.Body); _observableMessages.OnNext(nodeMessage); + receivingChannel.BasicAck(e.DeliveryTag, false); _logger.MessageProcessed(_exchangeName, nodeId, messageId); } catch (Exception ex) { + receivingChannel.BasicNack(e.DeliveryTag, false, false); _logger.ErrorProcessMessage(ex, _exchangeName, nodeId, messageId); } } From ac5b6978bd8d8eb91fc0b9afd77b9291480436c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rastislav=20Novotn=C3=BD?= Date: Sun, 18 Aug 2024 14:33:10 +0800 Subject: [PATCH 3/4] RabbitMQ health check --- .../RabbitHealthCheck.cs | 38 +++++++ .../RabbitServiceBus.cs | 104 ++++++++++++------ .../RabbitServiceBusLogger.cs | 12 +- .../Resources.Designer.cs | 18 +++ .../Resources.resx | 6 + src/Duracellko.PlanningPoker.Web/Program.cs | 4 +- 6 files changed, 145 insertions(+), 37 deletions(-) create mode 100644 src/Duracellko.PlanningPoker.RabbitMQ/RabbitHealthCheck.cs diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitHealthCheck.cs b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitHealthCheck.cs new file mode 100644 index 0000000..eb40153 --- /dev/null +++ b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitHealthCheck.cs @@ -0,0 +1,38 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace Duracellko.PlanningPoker.RabbitMQ; + +/// +/// Health check that reports status of RabbitMQ connection. +/// +public class RabbitHealthCheck : IHealthCheck +{ + private readonly RabbitServiceBus _rabbitServiceBus; + + /// + /// Initializes a new instance of the class. + /// + /// The service that manages connection to RabbitMQ. + public RabbitHealthCheck(RabbitServiceBus rabbitServiceBus) + { + ArgumentNullException.ThrowIfNull(rabbitServiceBus); + _rabbitServiceBus = rabbitServiceBus; + } + + /// + /// Runs the health check, returning the status of the RabbitMQ connection. + /// + /// A context object associated with the current execution. + /// A that can be used to cancel the health check. + /// The health status of the RabbitMQ connection. + public Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) + { + var healthResult = _rabbitServiceBus.IsConnected ? + HealthCheckResult.Healthy(Resources.Health_RabbitHealthy) : + HealthCheckResult.Unhealthy(Resources.Health_RabbitUnhealthy); + return Task.FromResult(healthResult); + } +} diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs index 564b7af..0647a7d 100644 --- a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs +++ b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBus.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Reactive.Subjects; using System.Threading.Tasks; @@ -19,6 +18,7 @@ namespace Duracellko.PlanningPoker.RabbitMQ; public class RabbitServiceBus : IServiceBus, IDisposable { private const string DefaultExchangeName = "PlanningPoker"; + private const string QueuePrefix = "PlanningPoker-"; private static readonly TimeSpan PublishTimeout = TimeSpan.FromSeconds(5); @@ -30,7 +30,9 @@ public class RabbitServiceBus : IServiceBus, IDisposable private volatile string? _nodeId; private IConnection? _connection; private IModel? _receivingChannel; - private string? _exchangeName; + private string? _sendingExchangeName; + private string? _receivingExchangeName; + private string? _queueName; /// /// Initializes a new instance of the class. @@ -51,6 +53,11 @@ public RabbitServiceBus( _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } + /// + /// Gets a value indicating whether the connection to RabbitMQ is opened. + /// + public bool IsConnected => _connection != null && _connection.IsOpen; + /// /// Gets a configuration of planning poker for Azure platform. /// @@ -91,13 +98,15 @@ public Task Register(string nodeId) _connection.CallbackException += ConnectionOnCallbackException; InitializeExchangeName(); - var queueName = InitializeTopology(); + InitializeTopology(); var receivingChannel = _receivingChannel!; + var queueName = _queueName!; + var consumer = new EventingBasicConsumer(receivingChannel); consumer.Received += ConsumerOnReceived; receivingChannel.BasicConsume(queueName, false, consumer); - _logger.QueueCreated(_exchangeName, nodeId); + _logger.QueueCreated(_receivingExchangeName, nodeId); return Task.CompletedTask; } @@ -107,12 +116,7 @@ public Task Register(string nodeId) /// A representing the asynchronous operation. public Task Unregister() { - if (_receivingChannel != null) - { - _receivingChannel.Close(); - _receivingChannel.Dispose(); - _receivingChannel = null; - } + DeleteQueue(); if (_connection != null) { @@ -122,18 +126,19 @@ public Task Unregister() _connection = null; } - if (!_observableMessages.IsDisposed) + if (_receivingChannel != null) { - _observableMessages.OnCompleted(); - _observableMessages.Dispose(); + _receivingChannel.Dispose(); + _receivingChannel = null; } - if (_nodeId != null) + if (!_observableMessages.IsDisposed) { - _logger.QueueClosed(_exchangeName, _nodeId); - _nodeId = null; + _observableMessages.OnCompleted(); + _observableMessages.Dispose(); } + _nodeId = null; return Task.CompletedTask; } @@ -177,7 +182,7 @@ private bool SendMessage(NodeMessage message, bool isRetry) properties.Headers = _messageConverter.GetMessageHeaders(message); var body = _messageConverter.GetMessageBody(message); - sendingChannel.BasicPublish(_exchangeName, string.Empty, properties, body); + sendingChannel.BasicPublish(_sendingExchangeName, string.Empty, properties, body); sendingChannel.WaitForConfirmsOrDie(PublishTimeout); _logger.SendMessage(properties.MessageId); return false; @@ -210,7 +215,7 @@ private void ConsumerOnReceived(object? sender, BasicDeliverEventArgs e) var recipientId = _messageConverter.GetHeader(headers, MessageConverter.RecipientIdPropertyName); var messageId = e.BasicProperties.MessageId; - _logger.MessageReceived(_exchangeName, nodeId, messageId); + _logger.MessageReceived(_receivingExchangeName, nodeId, messageId); if (senderId == nodeId || (recipientId != null && recipientId != nodeId)) { @@ -222,19 +227,19 @@ private void ConsumerOnReceived(object? sender, BasicDeliverEventArgs e) var nodeMessage = _messageConverter.GetNodeMessage(headers, e.Body); _observableMessages.OnNext(nodeMessage); receivingChannel.BasicAck(e.DeliveryTag, false); - _logger.MessageProcessed(_exchangeName, nodeId, messageId); + _logger.MessageProcessed(_receivingExchangeName, nodeId, messageId); } catch (Exception ex) { receivingChannel.BasicNack(e.DeliveryTag, false, false); - _logger.ErrorProcessMessage(ex, _exchangeName, nodeId, messageId); + _logger.ErrorProcessMessage(ex, _receivingExchangeName, nodeId, messageId); } } } private void ConnectionOnCallbackException(object? sender, CallbackExceptionEventArgs e) { - _logger.ConnectionCallbackError(e.Exception, _exchangeName, _nodeId); + _logger.ConnectionCallbackError(e.Exception, _receivingExchangeName, _nodeId); } private ConnectionFactory CreateConnectionFactory() @@ -253,28 +258,56 @@ private ConnectionFactory CreateConnectionFactory() private void InitializeExchangeName() { - _exchangeName = Configuration.ServiceBusTopic; - if (string.IsNullOrEmpty(_exchangeName)) + var topic = Configuration.ServiceBusTopic; + if (string.IsNullOrEmpty(topic)) + { + _sendingExchangeName = DefaultExchangeName; + _receivingExchangeName = DefaultExchangeName; + } + else { - _exchangeName = DefaultExchangeName; + var separatorIndex = topic.IndexOf(';', StringComparison.Ordinal); + if (separatorIndex > 0 && separatorIndex < topic.Length - 1) + { + _sendingExchangeName = topic.Substring(0, separatorIndex); + _receivingExchangeName = topic.Substring(separatorIndex + 1); + } + else + { + _sendingExchangeName = topic; + _receivingExchangeName = topic; + } } } - private string InitializeTopology() + private void InitializeTopology() { var receivingChannel = _connection!.CreateModel(); - receivingChannel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout); + receivingChannel.ExchangeDeclare(_sendingExchangeName, ExchangeType.Fanout); + receivingChannel.ExchangeDeclare(_receivingExchangeName, ExchangeType.Fanout); + _receivingChannel = receivingChannel; - var queueParameters = new Dictionary() - { - { "message-ttl", 60000 } - }; - var queue = receivingChannel.QueueDeclare(arguments: queueParameters); - var queueName = queue.QueueName; - receivingChannel.QueueBind(queueName, _exchangeName, string.Empty); + var queue = receivingChannel.QueueDeclare(QueuePrefix + _nodeId, false, false, false); + _queueName = queue.QueueName; + receivingChannel.QueueBind(queue.QueueName, _receivingExchangeName, string.Empty); + } - _receivingChannel = receivingChannel; - return queueName; + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Continue disposing other resources.")] + private void DeleteQueue() + { + if (_queueName != null && _receivingChannel != null) + { + try + { + _receivingChannel.QueueDelete(_queueName); + _queueName = null; + _logger.QueueClosed(_receivingExchangeName, _nodeId); + } + catch (Exception ex) + { + _logger.ErrorClosingQueue(ex, _receivingExchangeName, _nodeId); + } + } } private IBasicProperties CreateBasicProperties(NodeMessage message, IModel model) @@ -283,6 +316,7 @@ private IBasicProperties CreateBasicProperties(NodeMessage message, IModel model properties.MessageId = _guidProvider.NewGuid().ToString(); properties.Type = message.MessageType.ToString(); + properties.Expiration = "60000"; // 1 minute if (message.Data != null) { diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs index 13ca5be..a93987d 100644 --- a/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs +++ b/src/Duracellko.PlanningPoker.RabbitMQ/RabbitServiceBusLogger.cs @@ -42,9 +42,14 @@ internal static class RabbitServiceBusLogger new EventId(BaseEventId + 7, nameof(QueueClosed)), "RabbitMQ queue was closed (Channel: {Channel}, NodeID: {NodeId})"); + private static readonly Action _errorClosingQueue = LoggerMessage.Define( + LogLevel.Error, + new EventId(BaseEventId + 8, nameof(ErrorClosingQueue)), + "Closing RabbitMQ queue failed (Channel: {Channel}, NodeID: {NodeId})"); + private static readonly Action _connectionCallbackError = LoggerMessage.Define( LogLevel.Error, - new EventId(BaseEventId + 8, nameof(ConnectionCallbackError)), + new EventId(BaseEventId + 9, nameof(ConnectionCallbackError)), "RabbitMQ connection callback failed (Channel: {Channel}, NodeID: {NodeId})"); public static void SendMessage(this ILogger logger, string? messageId) @@ -82,6 +87,11 @@ public static void QueueClosed(this ILogger logger, string? channel, string? nod _queueClosed(logger, channel, nodeId, null); } + public static void ErrorClosingQueue(this ILogger logger, Exception exception, string? channel, string? nodeId) + { + _errorClosingQueue(logger, channel, nodeId, exception); + } + public static void ConnectionCallbackError(this ILogger logger, Exception exception, string? channel, string? nodeId) { _connectionCallbackError(logger, channel, nodeId, exception); diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs index f908d4b..8cf8cc3 100644 --- a/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs +++ b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.Designer.cs @@ -68,5 +68,23 @@ internal static string Error_RabbitMQNotInitialized { return ResourceManager.GetString("Error_RabbitMQNotInitialized", resourceCulture); } } + + /// + /// Looks up a localized string similar to RabbitMQ connection is healthy.. + /// + internal static string Health_RabbitHealthy { + get { + return ResourceManager.GetString("Health_RabbitHealthy", resourceCulture); + } + } + + /// + /// Looks up a localized string similar to RabbitMQ is disconnected.. + /// + internal static string Health_RabbitUnhealthy { + get { + return ResourceManager.GetString("Health_RabbitUnhealthy", resourceCulture); + } + } } } diff --git a/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx index b011173..8d2ac6d 100644 --- a/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx +++ b/src/Duracellko.PlanningPoker.RabbitMQ/Resources.resx @@ -120,4 +120,10 @@ RabbitMQ is not initialized. + + RabbitMQ connection is healthy. + + + RabbitMQ is disconnected. + \ No newline at end of file diff --git a/src/Duracellko.PlanningPoker.Web/Program.cs b/src/Duracellko.PlanningPoker.Web/Program.cs index d4ce3e4..03482f1 100644 --- a/src/Duracellko.PlanningPoker.Web/Program.cs +++ b/src/Duracellko.PlanningPoker.Web/Program.cs @@ -103,7 +103,9 @@ private static void ConfigureServices(IServiceCollection services, IConfiguratio if (planningPokerConfiguration.ServiceBusConnectionString!.StartsWith("RABBITMQ:", StringComparison.Ordinal)) { - services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + healthChecks.AddCheck("RabbitMQ"); } else if (planningPokerConfiguration.ServiceBusConnectionString.StartsWith("REDIS:", StringComparison.Ordinal)) { From f8b38a6c381543301e48184a76077e6d85015768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rastislav=20Novotn=C3=BD?= Date: Sun, 18 Aug 2024 22:12:59 +0800 Subject: [PATCH 4/4] Docker container tests with RabbitMQ --- docker/test/RunTests.ps1 | 45 +++---------------- docker/test/compose.yml | 35 +++++++++------ docker/test/redis.conf | 3 -- .../AzurePlanningPokerNodeService.cs | 24 ++++++++-- 4 files changed, 47 insertions(+), 60 deletions(-) delete mode 100644 docker/test/redis.conf diff --git a/docker/test/RunTests.ps1 b/docker/test/RunTests.ps1 index 68ec810..a992ac8 100644 --- a/docker/test/RunTests.ps1 +++ b/docker/test/RunTests.ps1 @@ -5,7 +5,7 @@ Param ( $projectPath = $PSScriptRoot $pesterVersion = '5.6.1' -$redisVersion = '7.4' +$rabbitmqVersion = '3.13' $imageTag = 'local-test' if (![string]::IsNullOrEmpty($PlanningPokerImageTag)) { @@ -13,8 +13,7 @@ if (![string]::IsNullOrEmpty($PlanningPokerImageTag)) { } $composeProjectName = 'planningpoker' -$redisAppPassword = (New-Guid).ToString() -$redisAdminPassword = (New-Guid).ToString() +$rabbitmqPassword = (New-Guid).ToString() $applicationPorts = @(5001, 5002, 5003) function RandomizeApplicationPorts() { @@ -31,45 +30,15 @@ function SetupEnvironmentVariables() { [Parameter(Mandatory = $true)] [int[]] $ApplicationPorts, [Parameter(Mandatory = $true)] - [string] $AppPassword + [string] $RabbitMQPassword ) $env:PLANNINGPOKER_IMAGENAME = 'duracellko/planningpoker:' + $AppImageTag $env:PLANNINGPOKER_APP1_PORT = $ApplicationPorts[0] $env:PLANNINGPOKER_APP2_PORT = $ApplicationPorts[1] $env:PLANNINGPOKER_APP3_PORT = $ApplicationPorts[2] - $env:PLANNINGPOKER_APP_REDIS_PASSWORD = $AppPassword - $env:PLANNINGPOKER_REDIS_VERSION = $redisVersion -} - -function PrepareRedisConfiguration() { - param ( - [Parameter(Mandatory = $true)] - [string] $AppPassword, - [Parameter(Mandatory = $true)] - [string] $AdminPassword - ) - - $configurationPath = Join-Path -Path $projectPath -ChildPath 'redis.conf' - $configuration = Get-Content -Path $configurationPath -Raw - $configuration = $configuration.Replace('${PLANNINGPOKER_ADMIN_PASSWORD}', $AdminPassword) - $configuration = $configuration.Replace('${PLANNINGPOKER_APP_PASSWORD}', $AppPassword) - Set-Content -Path $configurationPath -Value $configuration -NoNewline -} - -function RevertRedisConfiguration() { - param ( - [Parameter(Mandatory = $true)] - [string] $AppPassword, - [Parameter(Mandatory = $true)] - [string] $AdminPassword - ) - - $configurationPath = Join-Path -Path $projectPath -ChildPath 'redis.conf' - $configuration = Get-Content -Path $configurationPath -Raw - $configuration = $configuration.Replace($AdminPassword, '${PLANNINGPOKER_ADMIN_PASSWORD}') - $configuration = $configuration.Replace($AppPassword, '${PLANNINGPOKER_APP_PASSWORD}') - Set-Content -Path $configurationPath -Value $configuration -NoNewline + $env:PLANNINGPOKER_APP_RABBITMQ_PASSWORD = $RabbitMQPassword + $env:PLANNINGPOKER_RABBITMQ_VERSION = $rabbitmqVersion } function ComposeDockerUp() { @@ -118,8 +87,7 @@ $composeFilePath = Join-Path -Path $projectPath -ChildPath 'compose.yml' try { RandomizeApplicationPorts - SetupEnvironmentVariables -AppImageTag $imageTag -ApplicationPorts $applicationPorts -AppPassword $redisAppPassword - PrepareRedisConfiguration -AppPassword $redisAppPassword -AdminPassword $redisAdminPassword + SetupEnvironmentVariables -AppImageTag $imageTag -ApplicationPorts $applicationPorts -RabbitMQPassword $rabbitmqPassword ComposeDockerUp -ComposePath $composeFilePath -ProjectName $composeProjectName @@ -157,6 +125,5 @@ try { } } finally { - RevertRedisConfiguration -AppPassword $redisAppPassword -AdminPassword $redisAdminPassword ComposeDockerDown -ComposePath $composeFilePath -ProjectName $composeProjectName } \ No newline at end of file diff --git a/docker/test/compose.yml b/docker/test/compose.yml index 454d593..17a3f61 100644 --- a/docker/test/compose.yml +++ b/docker/test/compose.yml @@ -2,9 +2,10 @@ services: planningpoker-r1: image: ${PLANNINGPOKER_IMAGENAME:-duracellko/planningpoker:local-test} depends_on: - - redis + rabbitmq: + condition: service_healthy environment: - PlanningPoker__ServiceBusConnectionString: "REDIS:redis,user=planningpoker,password=${PLANNINGPOKER_APP_REDIS_PASSWORD}" + PlanningPoker__ServiceBusConnectionString: "RABBITMQ:amqp://planningpoker:${PLANNINGPOKER_APP_RABBITMQ_PASSWORD}@rabbitmq/" PlanningPoker__InitializationMessageTimeout: 3 PlanningPokerClient__UseHttpClient: true ports: @@ -13,9 +14,10 @@ services: planningpoker-r2: image: ${PLANNINGPOKER_IMAGENAME:-duracellko/planningpoker:local-test} depends_on: - - redis + rabbitmq: + condition: service_healthy environment: - PlanningPoker__ServiceBusConnectionString: "REDIS:redis,user=planningpoker,password=${PLANNINGPOKER_APP_REDIS_PASSWORD}" + PlanningPoker__ServiceBusConnectionString: "RABBITMQ:amqp://planningpoker:${PLANNINGPOKER_APP_RABBITMQ_PASSWORD}@rabbitmq/" PlanningPoker__InitializationMessageTimeout: 3 PlanningPokerClient__UseHttpClient: true ports: @@ -24,19 +26,24 @@ services: planningpoker-r3: image: ${PLANNINGPOKER_IMAGENAME:-duracellko/planningpoker:local-test} depends_on: - - redis + rabbitmq: + condition: service_healthy environment: - PlanningPoker__ServiceBusConnectionString: "REDIS:redis,user=planningpoker,password=${PLANNINGPOKER_APP_REDIS_PASSWORD}" + PlanningPoker__ServiceBusConnectionString: "RABBITMQ:amqp://planningpoker:${PLANNINGPOKER_APP_RABBITMQ_PASSWORD}@rabbitmq/" PlanningPoker__InitializationMessageTimeout: 3 PlanningPokerClient__UseHttpClient: true ports: - "${PLANNINGPOKER_APP3_PORT:-5003}:8080" - redis: - image: redis:${PLANNINGPOKER_REDIS_VERSION:-latest} - volumes: - - type: bind - source: ./redis.conf - target: /usr/local/etc/redis/redis.conf - read_only: true - command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ] + rabbitmq: + image: rabbitmq:${PLANNINGPOKER_RABBITMQ_VERSION:-latest} + environment: + RABBITMQ_DEFAULT_USER: planningpoker + RABBITMQ_DEFAULT_PASS: ${PLANNINGPOKER_APP_RABBITMQ_PASSWORD} + healthcheck: + test: "rabbitmq-diagnostics -q check_running && rabbitmq-diagnostics -q check_local_alarms && rabbitmq-diagnostics -q check_port_listener 5672" + interval: 30s + timeout: 10s + retries: 3 + start_period: 20s + start_interval: 2s diff --git a/docker/test/redis.conf b/docker/test/redis.conf deleted file mode 100644 index e95d6d0..0000000 --- a/docker/test/redis.conf +++ /dev/null @@ -1,3 +0,0 @@ -user admin on allcommands allkeys allchannels >${PLANNINGPOKER_ADMIN_PASSWORD} -user planningpoker on +@connection +@pubsub resetchannels &PlanningPoker >${PLANNINGPOKER_APP_PASSWORD} -user default off diff --git a/src/Duracellko.PlanningPoker.Web/AzurePlanningPokerNodeService.cs b/src/Duracellko.PlanningPoker.Web/AzurePlanningPokerNodeService.cs index 4e234bc..fd422c3 100644 --- a/src/Duracellko.PlanningPoker.Web/AzurePlanningPokerNodeService.cs +++ b/src/Duracellko.PlanningPoker.Web/AzurePlanningPokerNodeService.cs @@ -1,24 +1,40 @@ using System; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using Duracellko.PlanningPoker.Azure; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; namespace Duracellko.PlanningPoker.Web; public sealed class AzurePlanningPokerNodeService : IHostedService, IDisposable { + private static readonly Action _logErrorStart = LoggerMessage.Define( + LogLevel.Error, + new EventId(0, "ErrorStartAzurePlanningPokerNode"), + "Starting Azure PlanningPoker Node failed."); + private readonly PlanningPokerAzureNode _node; + private readonly ILogger _logger; - public AzurePlanningPokerNodeService(PlanningPokerAzureNode node) + public AzurePlanningPokerNodeService(PlanningPokerAzureNode node, ILogger logger) { _node = node ?? throw new ArgumentNullException(nameof(node)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } - public Task StartAsync(CancellationToken cancellationToken) + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Log error and continue without Azure node service.")] + public async Task StartAsync(CancellationToken cancellationToken) { - _node.Start(); - return Task.CompletedTask; + try + { + await _node.Start(); + } + catch (Exception ex) + { + _logErrorStart(_logger, ex); + } } public Task StopAsync(CancellationToken cancellationToken)