Skip to content

Commit

Permalink
add message size limit error (#124)
Browse files Browse the repository at this point in the history
* add message size limit error

* remove console.log

* bug fix

* fix test

* version 2.5.4
  • Loading branch information
andkom authored Dec 23, 2019
1 parent 1bd83f5 commit 0bd4add
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 26 deletions.
17 changes: 13 additions & 4 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ class Amqp {
if (throttle) {
await throttle();
}
log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, '
+ 'options=%j', exchangeName, routingKey, payload, options);
const buffer = Buffer.from(payload);

log.trace('Pushing to exchange=%s, routingKey=%s, messageSize=%d, options=%j',
exchangeName, routingKey, buffer.length, options);
log.debug('Current memory usage: %s Mb', process.memoryUsage().heapUsed / 1048576);

const result = await this.publishMessage(exchangeName, routingKey, Buffer.from(payload), options, 0);
const result = await this.publishMessage(exchangeName, routingKey, buffer, options, 0);
if (this.settings.PROCESS_AMQP_DRAIN) {
if (result) {
return result;
Expand Down Expand Up @@ -138,7 +140,7 @@ class Amqp {
if (iteration < settings.AMQP_PUBLISH_RETRY_ATTEMPTS) {
return await this.publishMessage(exchangeName, routingKey, payloadBuffer, options, iteration);
} else {
throw new Error(`failed on publishing ${options.headers.messageId} message to MQ: ` + error);
throw new Error(`Failed on publishing ${options.headers.messageId} message to MQ: ` + error);
}
}

Expand All @@ -150,6 +152,13 @@ class Amqp {
data.headers = filterMessageHeaders(data.headers);
const encryptedData = encryptor.encryptMessageContent(data);

if (encryptedData.length > settings.OUTGOING_MESSAGE_SIZE_LIMIT) {
const error = new Error(`Outgoing message size ${encryptedData.length}`
+ ` exceeds limit of ${settings.OUTGOING_MESSAGE_SIZE_LIMIT}.`);
log.error(error);
throw error;
}

return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle);
}

Expand Down
6 changes: 5 additions & 1 deletion lib/sailor.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ class Sailor {
});
}

return self.amqpConnection.sendData(data, props, self.throttles.data);
try {
return await self.amqpConnection.sendData(data, props, self.throttles.data);
} catch (err) {
return onError(err);
}
}

async function onHttpReply(reply) {
Expand Down
3 changes: 2 additions & 1 deletion lib/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ function readFrom(envVars) {
RATE_INTERVAL: 100, // 100ms
PROCESS_AMQP_DRAIN: true,
AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms
AMQP_PUBLISH_RETRY_ATTEMPTS: 10
AMQP_PUBLISH_RETRY_ATTEMPTS: 10,
OUTGOING_MESSAGE_SIZE_LIMIT: 10485760
};

if (envVars.ELASTICIO_ADDITIONAL_VARS_FOR_HEADERS) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticio-sailor-nodejs",
"description": "The official elastic.io library for bootstrapping and executing for Node.js connectors",
"version": "2.5.3",
"version": "2.5.4",
"main": "run.js",
"scripts": {
"lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js",
Expand Down
48 changes: 29 additions & 19 deletions spec/amqp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ describe('AMQP', () => {

it('Should send message to outgoing channel when process data', () => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -109,10 +109,9 @@ describe('AMQP', () => {

it('Should send message async to outgoing channel when process data', done => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['waitForConfirms', 'on']);
amqp.publishChannel.publish = () => true;
spyOn(amqp.publishChannel, 'publish').andReturn(true);
amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -165,9 +164,25 @@ describe('AMQP', () => {
}, done);
});

it('Should throw error when message size exceeds limit', (done) => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const body = 'a'.repeat(settings.OUTGOING_MESSAGE_SIZE_LIMIT + 1);
const headers = {};

amqp.sendData({ body }, { headers })
.then(() => done.fail('Exception should be thrown'))
.catch(err => {
expect(err.message).toEqual('Outgoing message size 13981056 exceeds limit of 10485760.');
expect(amqp.publishChannel.publish).not.toHaveBeenCalled();
done();
});
});

it('Should sendHttpReply to outgoing channel using routing key from headers when process data', () => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const msg = {
statusCode: 200,
Expand Down Expand Up @@ -204,7 +219,7 @@ describe('AMQP', () => {

it('Should throw error in sendHttpReply if reply_to header not found', done => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const msg = {
statusCode: 200,
Expand All @@ -224,7 +239,7 @@ describe('AMQP', () => {
}
});
}
test().then(() => done(new Error('should throw')),(err) => {
test().then(() => done(new Error('should throw')), () => {
expect(amqp.publishChannel.publish).not.toHaveBeenCalled();
done();
});
Expand All @@ -233,7 +248,7 @@ describe('AMQP', () => {

it('Should send message to outgoing channel using routing key from headers when process data', () => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const msg = {
headers: {
Expand Down Expand Up @@ -278,7 +293,7 @@ describe('AMQP', () => {

it('Should send message to errors when process error', () => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -332,7 +347,7 @@ describe('AMQP', () => {
};

const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -379,18 +394,14 @@ describe('AMQP', () => {
}
});

console.log('AND BACK');
//eslint-disable-next-line max-len
console.log(encryptor.decryptMessageContent('+PAlXNRj+5HdYNSuw3cyrfXNSlnUHKH0AtyspQkvT0RFROPAhMgqrj8y1I0EW9zJEhcRzmiEwbK5ftV3a8N3FcMd1Yu2beNt0R2Ou2f1yae0FxZ/aIUOmicX3iWbUKFnfljwUUA39sEKnpp9yP7zprAf755FgEtplt3cSy+hQVCC0u7olkbIeHtmSuw/9YP9PckVk82eM7FfnK5qKEDilzR9CWgpQEak8kZeekko86WczgkRrnMj52ifGVCbIk4aY5K+uBPbQKURI9bbBra4aR0l/2Y/bOBa5jahl2Q6hrX9iAe9BMMIll9GvDxBOEV7n5H5CsZj1IrFbq5nri3qT48LgNFTDlq/ts2kAjJQORPZnp3Fq25B9ToPQt6DGGZLUG+YKGHCv73RNwUCx4Dj2oVJjNyWIYMA4EEJwcHhR+rUrHcAVJZ0SOOTJI1tJPzcasXy3d95XQgKpHSYcbXuUOtmql4oyU5ZP9QEiIscsWFS7fJs+r8Eit+H777vvc37zxjA3DM0LJ8QmB5VbkkGxYbi43dzzd3hOXz4Rvs6C08F3jDK20r+VpAqEDRo/OgBaBH4uhd+XynwVXUpKASHNaJirGGu1K8tpiX1+XOxAGqHyhZjBICeg/f8igqJs54af78AZPpvnoSQzkAhF5pDmvMINMPuJnM/ooK3O9SgJYEi4wMzu/vnAEajROE5t7d0QhSSollCx+IMpiz9XdSALZyRMNPaF2yLb3rw7gwXV7q67u/zPm79AR1GBrWbgxXei7gdA9z3TwgWdT91RfTRdSYZDsgenGCanrcpE+Wi+YEozIan9pC47xhBxzzIL9a3AUVllNIGc4qNfs9Al0M/r+kl+ndk+I2k6QFNr4aIjR/qsk52YjW/ZqmORbe2MoI4bIFS3FwlWRoYhJC78yLXOfghvl3xHJiq0Uir2vxmYdXYXfaY82g7ZtThaSqc63WZcD5CaV1Wy6jfqB1sHwuJsADE6BXPQKFfZ9t8tKE3b58rB47TFTmJb8TETgG/xK6pbaEo/Z7iWjFhJKTrcnnF4PynrJab6kw+pnU08u7/je9ZhDEf+jvK3XnqwC+A8XEktywihnrskQ7Eo9Wdmzuw9ujbY8EwQxIFK+TPpgQ8dv25aXPXspnPgiH+2lt19ok1oRIZTenv2KLXqE3wrvmXQIEbdAHFHXsTLj781/9iNdc8ta645V3ktqvz35s1c8Gr+ZbZIK5WRlrJ8TO1WcokSDK7H8hqY6CbT1QC3oFxr5pVPoqZzBMOR6g5MOPbR41XtcHlQopCKC6XeGAVd4dIuCx1CT4vqG+8RgOABxhrEeLmsHGFpBnwPtlVniZQixmOLSzQWUNoUDWMt2mwrWKb/VmzprnNmN++ybPqXhX8bD+k1NQDb7r5CwPqlzmCypXSNH9kVn0QvpqLT5elQ2295yzasW22c8mEPmSvNPM/rE/tqWJA6vAKbXOy1ktrG/TCbzGV2llAvqQqQPX8zGJrXEzKTYk+mHiIdMKpw1bWJhDUOAjdosi853Lbt2GuUjiVNMGJBXPcLLvmjjvv9oLcSYHBTuIfOkScLKKGUhabzHFPmdxgF1MB0zvVO22ooxhmhvCmq+dlag71bbP5RvTjHf50BzJZ5+ysGyM7FJm99BErHo2lTpHSKdSFF0nAlP9Z/Ybf2zTEunlz8RdmQgsq+0F+kwkxI7SqGTy0SAJbbgawNoNTptdyO33a41zprKd/3Wnp7kfoTOfmjVYdHPVFC1GywMER7ordLV3XpjrjX6R6JTd2eOZajcBCsEc+gzVqg/nR6t5y8jfS8NfzfdCMsRzEqz6vuy+M66zNIEocZiF9Tkm1r8MLwaUCE7QfEXexqkChAk9jaOzcojyOfAlXIxvVMn6yFF1gmmQtgudxsY7I/0ZjdSZlBgBFcPFT6OT+HTZ7cCAVF7J7GsGlVzwrUpqcQzSt9z3QrA0iTd4DUXgsWmFIgcdhWbPFlkaPKyZ+QXxrz2VYKCuzDWi3wzLaioFnHxLXZDt6Puo5mPiRTzSolu3fH4S31yVJ7E6e2n8zwUmnFiZ10TrrkO64b9B3TwLx1mLPap7F39DAnufj7XF4eKCdvGJEKVGc+SsyrElzKimsR4Zs9H/Jw+KOCWc/O9l8yFAc42EXUGWrq9L+B6NIaZ7hDY/sDHI748wyFPeUHhOa99BnR15Sr+IrXBG3tsXbyMgHv+gS66Nkmkllvwjpi5Q/7vJOrxrKyFS1KGl5+6N/PXj1Tn5SqWMN8Wj2mniEGD9zSaLy7DUCxmKYA9Dn3/8WQdY8yWmOyi+SFyrL6VgQ8sUQ5MNnVPhQevxB3ZQSTItofT0sE0Xv7yEYkc/T4HGVsvDRKz6RZwaZvZEg'));

payload = encryptor.decryptMessageContent(publishParameters[2].toString());

expect(payload).toEqual(expectedErrorPayload.error);
});

it('Should not provide errorInput if errorInput was empty', () => {
const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -429,7 +440,7 @@ describe('AMQP', () => {
it('Should not provide errorInput if errorInput was null', () => {

const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -469,7 +480,7 @@ describe('AMQP', () => {
it('Should send message to rebounds when rebound happened', () => {

const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -519,7 +530,7 @@ describe('AMQP', () => {
it('Should send message to rebounds with reboundIteration=3', () => {

const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -572,7 +583,7 @@ describe('AMQP', () => {
it('Should send message to errors when rebound limit exceeded', () => {

const amqp = new Amqp(settings);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']);
amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']);

const props = {
contentType: 'application/json',
Expand Down Expand Up @@ -605,7 +616,6 @@ describe('AMQP', () => {
]);

const payload = JSON.parse(publishParameters[2].toString());
console.log(payload);
payload.error = encryptor.decryptMessageContent(payload.error);
payload.errorInput = encryptor.decryptMessageContent(payload.errorInput);

Expand Down

0 comments on commit 0bd4add

Please sign in to comment.