Skip to content

Commit

Permalink
fix: correct issue where publishes and requests before connection or …
Browse files Browse the repository at this point in the history
…configuration had completed would get lost
  • Loading branch information
arobson committed Feb 17, 2018
1 parent 19a678d commit 1f58c0b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 26 deletions.
1 change: 1 addition & 0 deletions spec/behavior/configuration.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ describe('Configuration', function () {
var Broker = function (conn) {
this.connection = conn;
this.configurations = {};
this.configuring = {};
};

Broker.prototype.addConnection = function () {
Expand Down
6 changes: 4 additions & 2 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ const log = require('./log')('rabbot.configuration');
module.exports = function (Broker) {
Broker.prototype.configure = function (config) {
const emit = this.emit.bind(this);
this.configurations[ config.name || 'default' ] = config;
return new Promise(function (resolve, reject) {
const configName = config.name || 'default';
this.configurations[ configName ] = config;
this.configuring[ configName ] = new Promise(function (resolve, reject) {
function onExchangeError (connection, err) {
log.error('Configuration of %s failed due to an error in one or more exchange settings: %s', connection.name, err);
reject(err);
Expand Down Expand Up @@ -64,5 +65,6 @@ module.exports = function (Broker) {
reject
);
}.bind(this));
return this.configuring[ configName ];
};
};
71 changes: 47 additions & 24 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var Broker = function () {
this.autoNack = false;
this.serializers = serializers;
this.configurations = {};
this.configuring = {};
this.log = log;
_.bindAll(this);
};
Expand Down Expand Up @@ -188,6 +189,7 @@ Broker.prototype.close = function (connectionName = DEFAULT, reset = false) {
if (reset) {
this.connections[ connectionName ].reset();
}
delete this.configuring[ connectionName ];
return connection.close(reset);
} else {
return Promise.resolve(true);
Expand All @@ -203,7 +205,7 @@ Broker.prototype.deleteQueue = function (name, connectionName = DEFAULT) {
};

Broker.prototype.getExchange = function (name, connectionName = DEFAULT) {
return this.connections[ connectionName ].channels[ 'exchange:' + name ];
return this.connections[ connectionName ].channels[ `exchange:${name}` ];
};

Broker.prototype.getQueue = function (name, connectionName = DEFAULT) {
Expand Down Expand Up @@ -269,6 +271,20 @@ Broker.prototype.rejectUnhandled = function () {
unhandledStrategies.onUnhandled = unhandledStrategies.rejectOnUnhandled;
};

Broker.prototype.onExchange = function (exchangeName, connectionName = DEFAULT) {
const promises = [
this.connections[ connectionName ].promise,
this.connections[ connectionName ].promises[`exchange:${exchangeName}`]
];
if (this.configuring[ connectionName ]) {
promises.push(this.configuring[ connectionName ]);
}
return Promise.all(promises)
.then(
() => this.getExchange(exchangeName, connectionName)
);
};

Broker.prototype.onReturned = function (handler) {
returnedStrategies.onReturned = returnedStrategies.customOnReturned = handler;
};
Expand Down Expand Up @@ -303,9 +319,8 @@ Broker.prototype.publish = function (exchangeName, type, message, routingKey, co
options.body = options.body.toString();
}

return this.connections[ connectionName ].promise
.then(() => {
const exchange = this.getExchange(exchangeName, connectionName);
return this.onExchange(exchangeName, connectionName)
.then(exchange => {
if (exchange) {
return exchange.publish(options);
} else {
Expand Down Expand Up @@ -333,32 +348,40 @@ Broker.prototype.request = function (exchangeName, options = {}, notify, connect
const requestId = uuid.v1();
options.messageId = requestId;
options.connectionName = options.connectionName || connectionName;
const connection = this.connections[ options.connectionName ].options;
const exchange = this.getExchange(exchangeName, options.connectionName);
const publishTimeout = options.timeout || exchange.publishTimeout || connection.publishTimeout || 500;
const replyTimeout = options.replyTimeout || exchange.replyTimeout || connection.replyTimeout || (publishTimeout * 2);

return new Promise((resolve, reject) => {
const timeout = setTimeout(function () {
subscription.unsubscribe();
reject(new Error('No reply received within the configured timeout of ' + replyTimeout + ' ms'));
}, replyTimeout);
const subscription = responses.subscribe(requestId, function (message) {
if (message.properties.headers[ 'sequence_end' ]) { // jshint ignore:line
clearTimeout(timeout);
resolve(message);
subscription.unsubscribe();
} else if (notify) {
notify(message);
}

if (!this.connections[ connectionName ]) {
return Promise.reject(new Error(`Request failed - no connection ${connectionName} has been configured`));
}

return this.onExchange(exchangeName, connectionName)
.then(exchange => {
const connection = this.connections[ options.connectionName ].options;
const publishTimeout = options.timeout || exchange.publishTimeout || connection.publishTimeout || 500;
const replyTimeout = options.replyTimeout || exchange.replyTimeout || connection.replyTimeout || (publishTimeout * 2);

return new Promise((resolve, reject) => {
const timeout = setTimeout(function () {
subscription.unsubscribe();
reject(new Error('No reply received within the configured timeout of ' + replyTimeout + ' ms'));
}, replyTimeout);
const subscription = responses.subscribe(requestId, function (message) {
if (message.properties.headers[ 'sequence_end' ]) { // jshint ignore:line
clearTimeout(timeout);
resolve(message);
subscription.unsubscribe();
} else if (notify) {
notify(message);
}
});
this.publish(exchangeName, options);
});
});
this.publish(exchangeName, options);
});
};

Broker.prototype.reset = function () {
this.connections = {};
this.configurations = {};
this.configuring = {};
};

Broker.prototype.retry = function (connectionName = DEFAULT) {
Expand Down

0 comments on commit 1f58c0b

Please sign in to comment.