Skip to content

Commit

Permalink
[feature] Allow changing rabbit queue options; Change maxPriority of …
Browse files Browse the repository at this point in the history
…queueconverttask to 6 due to quorum queues; For bug 67127
  • Loading branch information
konovalovsergey committed Oct 24, 2024
1 parent 8814293 commit 6591c78
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
36 changes: 34 additions & 2 deletions Common/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,43 @@
"url": "amqp://guest:guest@localhost:5672",
"socketOptions": {},
"exchangepubsub": "ds.pubsub",
"queueconverttask": "ds.converttask",
"queuepubsubOptions": {
"autoDelete": true,
"exclusive": true,
"arguments": {
"x-queue-type": "classic"
}
},
"queueconverttask": "ds.converttask6",
"queueconverttaskOptions": {
"durable": true,
"maxPriority": 6,
"arguments": {
"x-queue-type": "classic"
}
},
"queueconvertresponse": "ds.convertresponse",
"queueconvertresponseOptions": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
}
},
"exchangeconvertdead": "ds.exchangeconvertdead",
"queueconvertdead": "ds.convertdead",
"queuedelayed": "ds.delayed"
"queueconvertdeadOptions": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
}
},
"queuedelayed": "ds.delayed",
"queuedelayedOptions": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
}
}
},
"activemq": {
"connectOptions": {
Expand Down
13 changes: 8 additions & 5 deletions Common/sources/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,14 @@ exports.VKEY_TIME_INCORRECT = -125;
exports.EDITOR_CHANGES = -160;
exports.PASSWORD = -180;

exports.QUEUE_PRIORITY_VERY_LOW = 0;
exports.QUEUE_PRIORITY_LOW = 1;
exports.QUEUE_PRIORITY_NORMAL = 2;
exports.QUEUE_PRIORITY_HIGH = 3;
exports.QUEUE_PRIORITY_VERY_HIGH = 4;
//Quorum queues internally only support two priorities: high and normal.
//Messages without a priority set will be mapped to normal as will priorities 0 - 4.
//Messages with a priority higher than 4 will be mapped to high.
exports.QUEUE_PRIORITY_VERY_LOW = 2;
exports.QUEUE_PRIORITY_LOW = 3;
exports.QUEUE_PRIORITY_NORMAL = 4;
exports.QUEUE_PRIORITY_HIGH = 5;
exports.QUEUE_PRIORITY_VERY_HIGH = 6;

exports.EDITOR_TYPE_WORD = 0;
exports.EDITOR_TYPE_SPREADSHEET = 1;
Expand Down
19 changes: 10 additions & 9 deletions Common/sources/taskqueueRabbitMQ.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ const cfgQueueType = config.get('queue.type');
var cfgVisibilityTimeout = config.get('queue.visibilityTimeout');
var cfgQueueRetentionPeriod = config.get('queue.retentionPeriod');
var cfgRabbitQueueConvertTask = config.get('rabbitmq.queueconverttask');
var cfgRabbitQueueConvertTaskOptions = config.get('rabbitmq.queueconverttaskOptions');
var cfgRabbitQueueConvertResponse = config.get('rabbitmq.queueconvertresponse');
var cfgRabbitQueueConvertResponseOptions = config.get('rabbitmq.queueconvertresponseOptions');
var cfgRabbitExchangeConvertDead = config.get('rabbitmq.exchangeconvertdead');
var cfgRabbitQueueConvertDead = config.get('rabbitmq.queueconvertdead');
var cfgRabbitQueueConvertDeadOptions = config.get('rabbitmq.queueconvertdeadOptions');
var cfgRabbitQueueDelayed = config.get('rabbitmq.queuedelayed');
var cfgRabbitQueueDelayedOptions = config.get('rabbitmq.queuedelayedOptions');
var cfgActiveQueueConvertTask = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconverttask');
var cfgActiveQueueConvertResponse = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertresponse');
var cfgActiveQueueConvertDead = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertdead');
Expand All @@ -72,24 +76,22 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
});
taskqueue.connection = conn;
var bAssertTaskQueue = false;
var optionsTaskQueue = {
durable: true,
maxPriority: constants.QUEUE_PRIORITY_VERY_HIGH,
let optionsTaskQueueDefault = {
messageTtl: cfgQueueRetentionPeriod * 1000,
deadLetterExchange: cfgRabbitExchangeConvertDead
};
let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTaskOptions};
if (isAddTask) {
taskqueue.channelConvertTask = yield rabbitMQCore.createConfirmChannelPromise(conn);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask,
optionsTaskQueue);
bAssertTaskQueue = true;
}
var bAssertResponseQueue = false;
var optionsResponseQueue = {durable: true};
if (isAddResponse) {
taskqueue.channelConvertResponse = yield rabbitMQCore.createConfirmChannelPromise(conn);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse,
optionsResponseQueue);
cfgRabbitQueueConvertResponseOptions);
bAssertResponseQueue = true;
}
var optionsReceive = {noAck: false};
Expand Down Expand Up @@ -131,19 +133,18 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
}, optionsReceive);
}
if (isAddDelayed) {
let optionsDelayedQueue = {
durable: true,
let optionsDelayedQueueDefault = {
deadLetterExchange: cfgRabbitExchangeConvertDead
};
let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayedOptions};
taskqueue.channelDelayed = yield rabbitMQCore.createConfirmChannelPromise(conn);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed, optionsDelayedQueue);
}
if (isEmitDead) {
taskqueue.channelConvertDead = yield rabbitMQCore.createChannelPromise(conn);
yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead, 'fanout',
optionsExchnangeDead);
var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead,
{durable: true});
var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead, cfgRabbitQueueConvertDeadOptions);

taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead, '');
yield rabbitMQCore.consumePromise(taskqueue.channelConvertDead, queue, function(message) {
Expand Down
3 changes: 2 additions & 1 deletion DocService/sources/pubsubRabbitMQ.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var activeMQCore = require('./../../Common/sources/activeMQCore');

const cfgQueueType = config.get('queue.type');
var cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub');
const cfgRabbitQueuePubsubOptions = config.get('rabbitmq.queuepubsubOptions');
var cfgActiveTopicPubSub = constants.ACTIVEMQ_TOPIC_PREFIX + config.get('activemq.topicpubsub');

const optionsExchange = {durable: true};
Expand All @@ -64,7 +65,7 @@ function initRabbit(pubsub, callback) {
'fanout', {durable: true});

pubsub.channelReceive = yield rabbitMQCore.createChannelPromise(conn);
var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, '', {autoDelete: true, exclusive: true});
var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, '', cfgRabbitQueuePubsubOptions);
pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub, '');
yield rabbitMQCore.consumePromise(pubsub.channelReceive, queue, function (message) {
if(null != pubsub.channelReceive){
Expand Down

0 comments on commit 6591c78

Please sign in to comment.