diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RMQMessageGatewayConnectionPool.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RMQMessageGatewayConnectionPool.cs index c74942d678..0f50d60936 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RMQMessageGatewayConnectionPool.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RMQMessageGatewayConnectionPool.cs @@ -26,6 +26,7 @@ THE SOFTWARE. */ using System.Collections.Generic; using Paramore.Brighter.MessagingGateway.RMQ.Logging; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; namespace Paramore.Brighter.MessagingGateway.RMQ { @@ -80,9 +81,19 @@ public void ResetConnection(ConnectionFactory connectionFactory) lock (s_lock) { - var connection = s_connectionPool[connectionId]; TryRemoveConnection(connectionId); - CreateConnection(connectionFactory); + + try + { + CreateConnection(connectionFactory); + } + catch (BrokerUnreachableException exception) + { + s_logger.Value.ErrorException( + "RMQMessagingGateway: Failed to reset connection to Rabbit MQ endpoint {0}", + exception, + connectionFactory.Endpoint); + } } } diff --git a/src/Paramore.Brighter.ServiceActivator/MessagePumpBase.cs b/src/Paramore.Brighter.ServiceActivator/MessagePumpBase.cs index 03ee3cefc9..2dd8c3448c 100644 --- a/src/Paramore.Brighter.ServiceActivator/MessagePumpBase.cs +++ b/src/Paramore.Brighter.ServiceActivator/MessagePumpBase.cs @@ -62,13 +62,13 @@ public async Task Run() catch (ChannelFailureException ex) when (ex.InnerException is BrokenCircuitException) { _logger.Value.WarnFormat("MessagePump: BrokenCircuitException messages from {1} on thread # {0}", Thread.CurrentThread.ManagedThreadId, Channel.Name); - Task.Delay(1000).Wait(); + await Task.Delay(1000); continue; } catch (ChannelFailureException) { _logger.Value.WarnFormat("MessagePump: ChannelFailureException messages from {1} on thread # {0}", Thread.CurrentThread.ManagedThreadId, Channel.Name); - Task.Delay(1000).Wait(); + await Task.Delay(1000); continue; } catch (Exception exception) @@ -85,7 +85,7 @@ public async Task Run() // empty queue if (message.Header.MessageType == MessageType.MT_NONE) { - Task.Delay(500).Wait(); + await Task.Delay(500); continue; } diff --git a/tests/Paramore.Brighter.Tests/MessagingGateway/rmq/When_resetting_a_connection_that_does_not_exist.cs b/tests/Paramore.Brighter.Tests/MessagingGateway/rmq/When_resetting_a_connection_that_does_not_exist.cs new file mode 100644 index 0000000000..751da85060 --- /dev/null +++ b/tests/Paramore.Brighter.Tests/MessagingGateway/rmq/When_resetting_a_connection_that_does_not_exist.cs @@ -0,0 +1,54 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.RMQ; +using RabbitMQ.Client; +using Xunit; + +namespace Paramore.Brighter.Tests.MessagingGateway.RMQ +{ + [Collection("RMQ")] + [Trait("Category", "RMQ")] + public class RMQMessageGatewayConnectionPoolResetConnectionDoesNotExist + { + private readonly RMQMessageGatewayConnectionPool _connectionPool; + + public RMQMessageGatewayConnectionPoolResetConnectionDoesNotExist() + { + _connectionPool = new RMQMessageGatewayConnectionPool("MyConnectionName", 7); + } + + [Fact] + public void When_resetting_a_connection_that_does_not_exist() + { + var connectionFactory = new ConnectionFactory {HostName = "invalidhost"}; + + Action resetConnection = () => _connectionPool.ResetConnection(connectionFactory); + + resetConnection.Should().NotThrow(); + } + } +} diff --git a/tests/Paramore.Brighter.Tests/MessagingGateway/rmq/When_resetting_a_connection_that_exists.cs b/tests/Paramore.Brighter.Tests/MessagingGateway/rmq/When_resetting_a_connection_that_exists.cs new file mode 100644 index 0000000000..460844af5c --- /dev/null +++ b/tests/Paramore.Brighter.Tests/MessagingGateway/rmq/When_resetting_a_connection_that_exists.cs @@ -0,0 +1,59 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using FakeItEasy; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.RMQ; +using RabbitMQ.Client; +using Xunit; + +namespace Paramore.Brighter.Tests.MessagingGateway.RMQ +{ + [Collection("RMQ")] + [Trait("Category", "RMQ")] + public class RMQMessageGatewayConnectionPoolResetConnectionExists + { + private readonly RMQMessageGatewayConnectionPool _connectionPool; + private readonly IConnection _originalConnection; + + public RMQMessageGatewayConnectionPoolResetConnectionExists() + { + _connectionPool = new RMQMessageGatewayConnectionPool("MyConnectionName", 7); + + var connectionFactory = new ConnectionFactory { HostName = "localhost" }; + + _originalConnection = _connectionPool.GetConnection(connectionFactory); + } + + [Fact] + public void When_resetting_a_connection_that_exists() + { + var connectionFactory = new ConnectionFactory{HostName = "localhost"}; + + _connectionPool.ResetConnection(connectionFactory); + + _connectionPool.GetConnection(connectionFactory).Should().NotBeSameAs(_originalConnection); + } + } +}