Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance RabbitMQ to set PrefetchCount #13402

Merged
merged 2 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/en/Background-Jobs-RabbitMq.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,31 @@ By default, all the job types use the `Default` RabbitMQ connection.
Configure<AbpRabbitMqBackgroundJobOptions>(options =>
{
options.DefaultQueueNamePrefix = "my_app_jobs.";
options.DefaultDelayedQueueNamePrefix = "my_app_jobs.delayed"
options.PrefetchCount = 1;
options.JobQueues[typeof(EmailSendingArgs)] =
new JobQueueConfiguration(
typeof(EmailSendingArgs),
queueName: "my_app_jobs.emails",
connectionName: "SecondConnection"
connectionName: "SecondConnection",
delayedQueueName:"my_app_jobs.emails.delayed"
);
});
````

* This example sets the default queue name prefix to `my_app_jobs.`. If different applications use the same RabbitMQ server, it would be important to use different prefixes for each application to not consume jobs of each other.
* This example sets the default queue name prefix to `my_app_jobs.` and default delayed queue name prefix to `my_app_jobs.delayed`. If different applications use the same RabbitMQ server, it would be important to use different prefixes for each application to not consume jobs of each other.
* Sets `PrefetchCount` for all queues.
* Also specifies a different connection string for the `EmailSendingArgs`.

`JobQueueConfiguration` class has some additional options in its constructor;

* `queueName`: The queue name that is used for this job. The prefix is not added, so you need to specify the full name of the queue.
* `DelayedQueueName`: The delayed queue name that is used for delayed execution of job. The prefix is not added, so you need to specify the full name of the queue.
* `connectionName`: The RabbitMQ connection name (see the connection configuration above). This is optional and the default value is `Default`.
* `durable` (optional, default: `true`).
* `exclusive` (optional, default: `false`).
* `autoDelete` (optional, default: `false`)
* `autoDelete` (optional, default: `false`).
* `PrefetchCount` (optional, default: null)

See the RabbitMQ documentation if you want to understand the `durable`, `exclusive` and `autoDelete` options better, while most of the times the default configuration is what you want.

Expand Down
3 changes: 2 additions & 1 deletion docs/en/Distributed-Event-Bus-RabbitMQ-Integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,14 @@ Configure<AbpRabbitMqOptions>(options =>
});
````

**Example: Configure the client and exchange names**
**Example: Configure the client, exchange names and prefetchCount**

````csharp
Configure<AbpRabbitMqEventBusOptions>(options =>
{
options.ClientName = "TestApp1";
options.ExchangeName = "TestMessages";
options.PrefetchCount = 1;
});
````

Expand Down
10 changes: 8 additions & 2 deletions docs/zh-Hans/Background-Jobs-RabbitMq.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,31 @@ Configure<AbpRabbitMqOptions>(options =>
Configure<AbpRabbitMqBackgroundJobOptions>(options =>
{
options.DefaultQueueNamePrefix = "my_app_jobs.";
options.DefaultDelayedQueueNamePrefix = "my_app_jobs.delayed"
options.PrefetchCount = 1;
options.JobQueues[typeof(EmailSendingArgs)] =
new JobQueueConfiguration(
typeof(EmailSendingArgs),
queueName: "my_app_jobs.emails",
connectionName: "SecondConnection"
connectionName: "SecondConnection",
delayedQueueName:"my_app_jobs.emails.delayed"
);
});
```

- 这个示例将默认的队列名前缀设置为 `my_app_jobs.`,如果多个项目都使用的同一个 RabbitMQ 服务,设置不同的前缀可以避免执行其他项目的后台作业.
- 这个示例将默认的队列名前缀设置为 `my_app_jobs.`并且设置默认的延迟队列名为 `my_app_jobs.delayed`,如果多个项目都使用的同一个 RabbitMQ 服务,设置不同的前缀可以避免执行其他项目的后台作业.
- 设置了预取数量, 用于所有队列.
- 这里还设置了 `EmailSendingArgs` 绑定的 RabbitMQ 连接.

`JobQueueConfiguration` 类的构造函数中,还有一些其他的可选参数.

- `queueName`: 指定后台作业对应的队列名称(全名).
* `DelayedQueueName`: 指定后台延迟执行的作业对于的队列名称(全名).
- `connectionName`: 后台作业对应的 RabbitMQ 连接名称,默认是 `Default`.
- `durable`: 可选参数,默认为 `true`.
- `exclusive`: 可选参数,默认为 `false`.
- `autoDelete`: 可选参数,默认为 `false`.
* `PrefetchCount` (可选参数, 默认为: null)

如果你想要更多地了解 `durable`,`exclusive`,`autoDelete` 的用法,请阅读 RabbitMQ 提供的文档.

Expand Down
3 changes: 2 additions & 1 deletion docs/zh-Hans/Distributed-Event-Bus-RabbitMQ-Integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,14 @@ Configure<AbpRabbitMqOptions>(options =>
});
````

**示例: 配置客户端和交换机名称**
**示例: 配置客户端,交换机名称和预取数量**

````csharp
Configure<AbpRabbitMqEventBusOptions>(options =>
{
options.ClientName = "TestApp1";
options.ExchangeName = "TestMessages";
options.PrefetchCount = 1;
});
````

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class AbpRabbitMqBackgroundJobOptions
/// Default value: "AbpBackgroundJobsDelayed."
/// </summary>
public string DefaultDelayedQueueNamePrefix { get; set; }

public ushort? PrefetchCount { get; set; }

public AbpRabbitMqBackgroundJobOptions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ protected virtual JobQueueConfiguration GetOrCreateJobQueueConfiguration()
new JobQueueConfiguration(
typeof(TArgs),
AbpRabbitMqBackgroundJobOptions.DefaultQueueNamePrefix + JobConfiguration.JobName,
AbpRabbitMqBackgroundJobOptions.DefaultDelayedQueueNamePrefix + JobConfiguration.JobName
AbpRabbitMqBackgroundJobOptions.DefaultDelayedQueueNamePrefix + JobConfiguration.JobName,
prefetchCount: AbpRabbitMqBackgroundJobOptions.PrefetchCount
);
}

Expand Down Expand Up @@ -140,9 +141,14 @@ protected virtual Task EnsureInitializedAsync()

if (AbpBackgroundJobOptions.IsJobExecutionEnabled)
{
if (QueueConfiguration.PrefetchCount.HasValue)
{
ChannelAccessor.Channel.BasicQos(0, QueueConfiguration.PrefetchCount.Value, false);
}

Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel);
Consumer.Received += MessageReceived;

//TODO: What BasicConsume returns?
ChannelAccessor.Channel.BasicConsume(
queue: QueueConfiguration.QueueName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ public JobQueueConfiguration(
string connectionName = null,
bool durable = true,
bool exclusive = false,
bool autoDelete = false)
bool autoDelete = false,
ushort? prefetchCount = null)
: base(
queueName,
durable,
exclusive,
autoDelete)
autoDelete,
prefetchCount)
{
JobArgsType = jobArgsType;
ConnectionName = connectionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class AbpRabbitMqEventBusOptions
public string ExchangeName { get; set; }

public string ExchangeType { get; set; }

public ushort? PrefetchCount { get; set; }

public string GetExchangeTypeOrDefault()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void Initialize()
AbpRabbitMqEventBusOptions.ClientName,
durable: true,
exclusive: false,
autoDelete: false
autoDelete: false,
prefetchCount: AbpRabbitMqEventBusOptions.PrefetchCount
),
AbpRabbitMqEventBusOptions.ConnectionName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@ public class QueueDeclareConfiguration
public bool Exclusive { get; set; }

public bool AutoDelete { get; set; }

public ushort? PrefetchCount { get; set; }

public IDictionary<string, object> Arguments { get; }

public QueueDeclareConfiguration(
[NotNull] string queueName,
bool durable = true,
bool exclusive = false,
bool autoDelete = false)
bool autoDelete = false,
ushort? prefetchCount = null)
{
QueueName = queueName;
Durable = durable;
Exclusive = exclusive;
AutoDelete = autoDelete;
Arguments = new Dictionary<string, object>();
PrefetchCount = prefetchCount;
}

public virtual QueueDeclareOk Declare(IModel channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,14 @@ protected virtual async Task TryCreateChannelAsync()
arguments: Queue.Arguments
);

if (Queue.PrefetchCount.HasValue)
{
Channel.BasicQos(0, Queue.PrefetchCount.Value, false);
}

var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;

Channel.BasicConsume(
queue: Queue.QueueName,
autoAck: false,
Expand Down