diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index 08d9d480367..5856f7bb16b 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -22,8 +22,7 @@ public ConnectionPool(IOptions options) public virtual IConnection Get(string connectionName = null) { - connectionName = connectionName - ?? RabbitMqConnections.DefaultConnectionName; + connectionName ??= RabbitMqConnections.DefaultConnectionName; return Connections.GetOrAdd( connectionName, @@ -58,4 +57,4 @@ public void Dispose() Connections.Clear(); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs index a30db4c96a4..f2b5168d82f 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs @@ -9,7 +9,7 @@ namespace Volo.Abp.RabbitMQ public class RabbitMqConnections : Dictionary { public const string DefaultConnectionName = "Default"; - + [NotNull] public ConnectionFactory Default { @@ -19,7 +19,7 @@ public ConnectionFactory Default public RabbitMqConnections() { - Default = new ConnectionFactory(); + Default = new ConnectionFactory() { DispatchConsumersAsync = true }; } public ConnectionFactory GetOrDefault(string connectionName) @@ -32,4 +32,4 @@ public ConnectionFactory GetOrDefault(string connectionName) return Default; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index fb1b0c7fe21..b94663d8860 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -143,10 +143,10 @@ protected virtual async Task TryCreateChannelAsync() try { - var channel = ConnectionPool + Channel = ConnectionPool .Get(ConnectionName) .CreateModel(); - channel.ExchangeDeclare( + Channel.ExchangeDeclare( exchange: Exchange.ExchangeName, type: Exchange.Type, durable: Exchange.Durable, @@ -154,7 +154,7 @@ protected virtual async Task TryCreateChannelAsync() arguments: Exchange.Arguments ); - channel.QueueDeclare( + Channel.QueueDeclare( queue: Queue.QueueName, durable: Queue.Durable, exclusive: Queue.Exclusive, @@ -162,19 +162,14 @@ protected virtual async Task TryCreateChannelAsync() arguments: Queue.Arguments ); - var consumer = new EventingBasicConsumer(channel); - consumer.Received += async (model, basicDeliverEventArgs) => - { - await HandleIncomingMessageAsync(channel, basicDeliverEventArgs); - }; + var consumer = new AsyncEventingBasicConsumer(Channel); + consumer.Received += HandleIncomingMessageAsync; - channel.BasicConsume( + Channel.BasicConsume( queue: Queue.QueueName, autoAck: false, consumer: consumer ); - - Channel = channel; } catch (Exception ex) { @@ -183,16 +178,16 @@ protected virtual async Task TryCreateChannelAsync() } } - protected virtual async Task HandleIncomingMessageAsync(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs) + protected virtual async Task HandleIncomingMessageAsync(object sender, BasicDeliverEventArgs basicDeliverEventArgs) { try { foreach (var callback in Callbacks) { - await callback(channel, basicDeliverEventArgs); + await callback(Channel, basicDeliverEventArgs); } - channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false); + Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false); } catch (Exception ex) {