Skip to content

Commit

Permalink
Merge pull request #7071 from abpframework/liangshiwei/rabbitmq-async
Browse files Browse the repository at this point in the history
Always use async consumer implementations
  • Loading branch information
maliming authored Jan 7, 2021
2 parents 9c1e24e + 0adc4af commit b4bfa11
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public ConnectionPool(IOptions<AbpRabbitMqOptions> options)

public virtual IConnection Get(string connectionName = null)
{
connectionName = connectionName
?? RabbitMqConnections.DefaultConnectionName;
connectionName ??= RabbitMqConnections.DefaultConnectionName;

return Connections.GetOrAdd(
connectionName,
Expand Down Expand Up @@ -58,4 +57,4 @@ public void Dispose()
Connections.Clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Volo.Abp.RabbitMQ
public class RabbitMqConnections : Dictionary<string, ConnectionFactory>
{
public const string DefaultConnectionName = "Default";

[NotNull]
public ConnectionFactory Default
{
Expand All @@ -19,7 +19,7 @@ public ConnectionFactory Default

public RabbitMqConnections()
{
Default = new ConnectionFactory();
Default = new ConnectionFactory() { DispatchConsumersAsync = true };
}

public ConnectionFactory GetOrDefault(string connectionName)
Expand All @@ -32,4 +32,4 @@ public ConnectionFactory GetOrDefault(string connectionName)
return Default;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,38 +143,33 @@ protected virtual async Task TryCreateChannelAsync()

try
{
var channel = ConnectionPool
Channel = ConnectionPool
.Get(ConnectionName)
.CreateModel();
channel.ExchangeDeclare(
Channel.ExchangeDeclare(
exchange: Exchange.ExchangeName,
type: Exchange.Type,
durable: Exchange.Durable,
autoDelete: Exchange.AutoDelete,
arguments: Exchange.Arguments
);

channel.QueueDeclare(
Channel.QueueDeclare(
queue: Queue.QueueName,
durable: Queue.Durable,
exclusive: Queue.Exclusive,
autoDelete: Queue.AutoDelete,
arguments: Queue.Arguments
);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, basicDeliverEventArgs) =>
{
await HandleIncomingMessageAsync(channel, basicDeliverEventArgs);
};
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;

channel.BasicConsume(
Channel.BasicConsume(
queue: Queue.QueueName,
autoAck: false,
consumer: consumer
);

Channel = channel;
}
catch (Exception ex)
{
Expand All @@ -183,16 +178,16 @@ protected virtual async Task TryCreateChannelAsync()
}
}

protected virtual async Task HandleIncomingMessageAsync(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs)
protected virtual async Task HandleIncomingMessageAsync(object sender, BasicDeliverEventArgs basicDeliverEventArgs)
{
try
{
foreach (var callback in Callbacks)
{
await callback(channel, basicDeliverEventArgs);
await callback(Channel, basicDeliverEventArgs);
}

channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false);
Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
Expand Down

0 comments on commit b4bfa11

Please sign in to comment.