Skip to content

Commit

Permalink
[bug] Refactoring rabbitmq config; For bug 67127
Browse files Browse the repository at this point in the history
  • Loading branch information
konovalovsergey committed Nov 7, 2024
1 parent 619f4e2 commit cb5b51d
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 65 deletions.
77 changes: 49 additions & 28 deletions Common/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,42 +114,63 @@
"rabbitmq": {
"url": "amqp://guest:guest@localhost:5672",
"socketOptions": {},
"exchangepubsub": "ds.pubsub",
"queuepubsubOptions": {
"autoDelete": true,
"exclusive": true,
"arguments": {
"x-queue-type": "classic"
"exchangepubsub": {
"name": "ds.pubsub",
"options": {
"durable": true
}
},
"queueconverttask": "ds.converttask6",
"queueconverttaskOptions": {
"durable": true,
"maxPriority": 6,
"arguments": {
"x-queue-type": "classic"
"queuepubsub": {
"name": "",
"options": {
"autoDelete": true,
"exclusive": true,
"arguments": {
"x-queue-type": "classic"
}
}
},
"queueconvertresponse": "ds.convertresponse",
"queueconvertresponseOptions": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
"queueconverttask": {
"name": "ds.converttask6",
"options": {
"durable": true,
"maxPriority": 6,
"arguments": {
"x-queue-type": "classic"
}
}
},
"exchangeconvertdead": "ds.exchangeconvertdead",
"queueconvertdead": "ds.convertdead",
"queueconvertdeadOptions": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
"queueconvertresponse": {
"name": "ds.convertresponse",
"options": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
}
}
},
"queuedelayed": "ds.delayed",
"queuedelayedOptions": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
"exchangeconvertdead": {
"name": "ds.exchangeconvertdead",
"options": {
"durable": true
}
},
"queueconvertdead": {
"name": "ds.convertdead",
"options": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
}
}
},
"queuedelayed": {
"name": "ds.delayed",
"options": {
"durable": true,
"arguments": {
"x-queue-type": "classic"
}
}
}
},
Expand Down
50 changes: 22 additions & 28 deletions Common/sources/taskqueueRabbitMQ.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ var utils = require('./utils');
var constants = require('./constants');
var rabbitMQCore = require('./rabbitMQCore');
var activeMQCore = require('./activeMQCore');
const logger = require('./logger');
const commonDefines = require('./commondefines');
const operationContext = require('./operationContext');

Expand All @@ -48,20 +47,15 @@ const cfgQueueType = config.get('queue.type');
var cfgVisibilityTimeout = config.get('queue.visibilityTimeout');
var cfgQueueRetentionPeriod = config.get('queue.retentionPeriod');
var cfgRabbitQueueConvertTask = config.get('rabbitmq.queueconverttask');
var cfgRabbitQueueConvertTaskOptions = config.get('rabbitmq.queueconverttaskOptions');
var cfgRabbitQueueConvertResponse = config.get('rabbitmq.queueconvertresponse');
var cfgRabbitQueueConvertResponseOptions = config.get('rabbitmq.queueconvertresponseOptions');
var cfgRabbitExchangeConvertDead = config.get('rabbitmq.exchangeconvertdead');
var cfgRabbitQueueConvertDead = config.get('rabbitmq.queueconvertdead');
var cfgRabbitQueueConvertDeadOptions = config.get('rabbitmq.queueconvertdeadOptions');
var cfgRabbitQueueDelayed = config.get('rabbitmq.queuedelayed');
var cfgRabbitQueueDelayedOptions = config.get('rabbitmq.queuedelayedOptions');
var cfgActiveQueueConvertTask = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconverttask');
var cfgActiveQueueConvertResponse = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertresponse');
var cfgActiveQueueConvertDead = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertdead');
var cfgActiveQueueDelayed = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queuedelayed');

const optionsExchnangeDead = {durable: true};
function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, isEmitDead, isAddDelayed, callback) {
return co(function* () {
var e = null;
Expand All @@ -78,31 +72,31 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
var bAssertTaskQueue = false;
let optionsTaskQueueDefault = {
messageTtl: cfgQueueRetentionPeriod * 1000,
deadLetterExchange: cfgRabbitExchangeConvertDead
deadLetterExchange: cfgRabbitExchangeConvertDead.name
};
let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTaskOptions};
let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTask.options};
if (isAddTask) {
taskqueue.channelConvertTask = yield rabbitMQCore.createConfirmChannelPromise(conn);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask,
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask.name,
optionsTaskQueue);
bAssertTaskQueue = true;
}
var bAssertResponseQueue = false;
if (isAddResponse) {
taskqueue.channelConvertResponse = yield rabbitMQCore.createConfirmChannelPromise(conn);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse,
cfgRabbitQueueConvertResponseOptions);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse.name,
cfgRabbitQueueConvertResponse.options);
bAssertResponseQueue = true;
}
var optionsReceive = {noAck: false};
if (isAddTaskReceive) {
taskqueue.channelConvertTaskReceive = yield rabbitMQCore.createChannelPromise(conn);
taskqueue.channelConvertTaskReceive.prefetch(1);
if (!bAssertTaskQueue) {
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask,
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask.name,
optionsTaskQueue);
}
yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask,
yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask.name,
function (message) {
co(function* () {
let ack = function() {
Expand All @@ -120,10 +114,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
if (isAddResponseReceive) {
taskqueue.channelConvertResponseReceive = yield rabbitMQCore.createChannelPromise(conn);
if (!bAssertResponseQueue) {
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse,
optionsResponseQueue);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse.name,
cfgRabbitQueueConvertResponse.options);
}
yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse,
yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse.name,
function (message) {
if (message) {
taskqueue.emit('response', message.content.toString(), function() {
Expand All @@ -134,19 +128,19 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd
}
if (isAddDelayed) {
let optionsDelayedQueueDefault = {
deadLetterExchange: cfgRabbitExchangeConvertDead
deadLetterExchange: cfgRabbitExchangeConvertDead.name
};
let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayedOptions};
let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayed.options};
taskqueue.channelDelayed = yield rabbitMQCore.createConfirmChannelPromise(conn);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed, optionsDelayedQueue);
yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed.name, optionsDelayedQueue);
}
if (isEmitDead) {
taskqueue.channelConvertDead = yield rabbitMQCore.createChannelPromise(conn);
yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead, 'fanout',
optionsExchnangeDead);
var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead, cfgRabbitQueueConvertDeadOptions);
yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead.name, 'fanout',
cfgRabbitExchangeConvertDead.options);
var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead.name, cfgRabbitQueueConvertDead.options);

taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead, '');
taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead.name, '');
yield rabbitMQCore.consumePromise(taskqueue.channelConvertDead, queue, function(message) {
if (null != taskqueue.channelConvertDead) {
if (message) {
Expand Down Expand Up @@ -370,7 +364,7 @@ function addTaskRabbit(taskqueue, content, priority, callback, opt_expiration, o
if (undefined !== opt_headers) {
options.headers = opt_headers;
}
taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask, content, options, callback);
taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask.name, content, options, callback);
}
function addTaskActive(taskqueue, content, priority, callback, opt_expiration, opt_headers) {
var msg = {durable: true, priority: priority, body: content, ttl: cfgQueueRetentionPeriod * 1000};
Expand Down Expand Up @@ -402,7 +396,7 @@ function addTaskString(taskqueue, task, priority, opt_expiration, opt_headers) {
}
function addResponseRabbit(taskqueue, content, callback) {
var options = {persistent: true};
taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse, content, options, callback);
taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse.name, content, options, callback);
}
function addResponseActive(taskqueue, content, callback) {
var msg = {durable: true, body: content};
Expand All @@ -419,7 +413,7 @@ function closeActive(conn) {
}
function addDelayedRabbit(taskqueue, content, ttl, callback) {
var options = {persistent: true, expiration: ttl.toString()};
taskqueue.channelDelayed.sendToQueue(cfgRabbitQueueDelayed, content, options, callback);
taskqueue.channelDelayed.sendToQueue(cfgRabbitQueueDelayed.name, content, options, callback);
}
function addDelayedActive(taskqueue, content, ttl, callback) {
var msg = {durable: true, body: content, ttl: ttl};
Expand All @@ -434,8 +428,8 @@ function healthCheckRabbit(taskqueue) {
if (!taskqueue.channelConvertDead) {
return false;
}
const exchange = yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead,
'fanout', optionsExchnangeDead);
const exchange = yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead.name,
'fanout', cfgRabbitExchangeConvertDead.options);
return !!exchange;
});
}
Expand Down
17 changes: 8 additions & 9 deletions DocService/sources/pubsubRabbitMQ.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ var rabbitMQCore = require('./../../Common/sources/rabbitMQCore');
var activeMQCore = require('./../../Common/sources/activeMQCore');

const cfgQueueType = config.get('queue.type');
var cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub');
const cfgRabbitQueuePubsubOptions = config.get('rabbitmq.queuepubsubOptions');
const cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub');
const cfgRabbitQueuePubsub = config.get('rabbitmq.queuepubsub');
var cfgActiveTopicPubSub = constants.ACTIVEMQ_TOPIC_PREFIX + config.get('activemq.topicpubsub');

const optionsExchange = {durable: true};
function initRabbit(pubsub, callback) {
return co(function* () {
var e = null;
Expand All @@ -61,12 +60,12 @@ function initRabbit(pubsub, callback) {
});
pubsub.connection = conn;
pubsub.channelPublish = yield rabbitMQCore.createChannelPromise(conn);
pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub,
'fanout', {durable: true});
pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub.name,
'fanout', cfgRabbitExchangePubSub.options);

pubsub.channelReceive = yield rabbitMQCore.createChannelPromise(conn);
var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, '', cfgRabbitQueuePubsubOptions);
pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub, '');
var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, cfgRabbitQueuePubsub.name, cfgRabbitQueuePubsub.options);
pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub.name, '');
yield rabbitMQCore.consumePromise(pubsub.channelReceive, queue, function (message) {
if(null != pubsub.channelReceive){
if (message) {
Expand Down Expand Up @@ -190,8 +189,8 @@ function healthCheckRabbit(pubsub) {
if (!pubsub.channelPublish) {
return false;
}
const exchange = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub,
'fanout', optionsExchange);
const exchange = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub.name,
'fanout', cfgRabbitExchangePubSub.options);
return !!exchange;
});
}
Expand Down

0 comments on commit cb5b51d

Please sign in to comment.