diff --git a/lib/amqp.js b/lib/amqp.js index 14f8895d..f93529cf 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -104,11 +104,13 @@ class Amqp { if (throttle) { await throttle(); } - log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, ' - + 'options=%j', exchangeName, routingKey, payload, options); + const buffer = Buffer.from(payload); + + log.trace('Pushing to exchange=%s, routingKey=%s, messageSize=%d, options=%j', + exchangeName, routingKey, buffer.length, options); log.debug('Current memory usage: %s Mb', process.memoryUsage().heapUsed / 1048576); - const result = await this.publishMessage(exchangeName, routingKey, Buffer.from(payload), options, 0); + const result = await this.publishMessage(exchangeName, routingKey, buffer, options, 0); if (this.settings.PROCESS_AMQP_DRAIN) { if (result) { return result; @@ -138,7 +140,7 @@ class Amqp { if (iteration < settings.AMQP_PUBLISH_RETRY_ATTEMPTS) { return await this.publishMessage(exchangeName, routingKey, payloadBuffer, options, iteration); } else { - throw new Error(`failed on publishing ${options.headers.messageId} message to MQ: ` + error); + throw new Error(`Failed on publishing ${options.headers.messageId} message to MQ: ` + error); } } @@ -150,6 +152,13 @@ class Amqp { data.headers = filterMessageHeaders(data.headers); const encryptedData = encryptor.encryptMessageContent(data); + if (encryptedData.length > settings.OUTGOING_MESSAGE_SIZE_LIMIT) { + const error = new Error(`Outgoing message size ${encryptedData.length}` + + ` exceeds limit of ${settings.OUTGOING_MESSAGE_SIZE_LIMIT}.`); + log.error(error); + throw error; + } + return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); } diff --git a/lib/sailor.js b/lib/sailor.js index 0957119c..7f57d4d5 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -328,7 +328,11 @@ class Sailor { }); } - return self.amqpConnection.sendData(data, props, self.throttles.data); + try { + return await self.amqpConnection.sendData(data, props, self.throttles.data); + } catch (err) { + return onError(err); + } } async function onHttpReply(reply) { diff --git a/lib/settings.js b/lib/settings.js index 13ca014a..ccc30c6e 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -50,7 +50,8 @@ function readFrom(envVars) { RATE_INTERVAL: 100, // 100ms PROCESS_AMQP_DRAIN: true, AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms - AMQP_PUBLISH_RETRY_ATTEMPTS: 10 + AMQP_PUBLISH_RETRY_ATTEMPTS: 10, + OUTGOING_MESSAGE_SIZE_LIMIT: 10485760 }; if (envVars.ELASTICIO_ADDITIONAL_VARS_FOR_HEADERS) { diff --git a/package.json b/package.json index d501e849..fc62f32b 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.5.3", + "version": "2.5.4", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index 6db6f814..c472a54b 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -68,7 +68,7 @@ describe('AMQP', () => { it('Should send message to outgoing channel when process data', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -109,10 +109,9 @@ describe('AMQP', () => { it('Should send message async to outgoing channel when process data', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['waitForConfirms', 'on']); amqp.publishChannel.publish = () => true; spyOn(amqp.publishChannel, 'publish').andReturn(true); - amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const props = { contentType: 'application/json', @@ -165,9 +164,25 @@ describe('AMQP', () => { }, done); }); + it('Should throw error when message size exceeds limit', (done) => { + const amqp = new Amqp(settings); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + + const body = 'a'.repeat(settings.OUTGOING_MESSAGE_SIZE_LIMIT + 1); + const headers = {}; + + amqp.sendData({ body }, { headers }) + .then(() => done.fail('Exception should be thrown')) + .catch(err => { + expect(err.message).toEqual('Outgoing message size 13981056 exceeds limit of 10485760.'); + expect(amqp.publishChannel.publish).not.toHaveBeenCalled(); + done(); + }); + }); + it('Should sendHttpReply to outgoing channel using routing key from headers when process data', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const msg = { statusCode: 200, @@ -204,7 +219,7 @@ describe('AMQP', () => { it('Should throw error in sendHttpReply if reply_to header not found', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const msg = { statusCode: 200, @@ -224,7 +239,7 @@ describe('AMQP', () => { } }); } - test().then(() => done(new Error('should throw')),(err) => { + test().then(() => done(new Error('should throw')), () => { expect(amqp.publishChannel.publish).not.toHaveBeenCalled(); done(); }); @@ -233,7 +248,7 @@ describe('AMQP', () => { it('Should send message to outgoing channel using routing key from headers when process data', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const msg = { headers: { @@ -278,7 +293,7 @@ describe('AMQP', () => { it('Should send message to errors when process error', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -332,7 +347,7 @@ describe('AMQP', () => { }; const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -379,10 +394,6 @@ describe('AMQP', () => { } }); - console.log('AND BACK'); - //eslint-disable-next-line max-len - console.log(encryptor.decryptMessageContent('+PAlXNRj+5HdYNSuw3cyrfXNSlnUHKH0AtyspQkvT0RFROPAhMgqrj8y1I0EW9zJEhcRzmiEwbK5ftV3a8N3FcMd1Yu2beNt0R2Ou2f1yae0FxZ/aIUOmicX3iWbUKFnfljwUUA39sEKnpp9yP7zprAf755FgEtplt3cSy+hQVCC0u7olkbIeHtmSuw/9YP9PckVk82eM7FfnK5qKEDilzR9CWgpQEak8kZeekko86WczgkRrnMj52ifGVCbIk4aY5K+uBPbQKURI9bbBra4aR0l/2Y/bOBa5jahl2Q6hrX9iAe9BMMIll9GvDxBOEV7n5H5CsZj1IrFbq5nri3qT48LgNFTDlq/ts2kAjJQORPZnp3Fq25B9ToPQt6DGGZLUG+YKGHCv73RNwUCx4Dj2oVJjNyWIYMA4EEJwcHhR+rUrHcAVJZ0SOOTJI1tJPzcasXy3d95XQgKpHSYcbXuUOtmql4oyU5ZP9QEiIscsWFS7fJs+r8Eit+H777vvc37zxjA3DM0LJ8QmB5VbkkGxYbi43dzzd3hOXz4Rvs6C08F3jDK20r+VpAqEDRo/OgBaBH4uhd+XynwVXUpKASHNaJirGGu1K8tpiX1+XOxAGqHyhZjBICeg/f8igqJs54af78AZPpvnoSQzkAhF5pDmvMINMPuJnM/ooK3O9SgJYEi4wMzu/vnAEajROE5t7d0QhSSollCx+IMpiz9XdSALZyRMNPaF2yLb3rw7gwXV7q67u/zPm79AR1GBrWbgxXei7gdA9z3TwgWdT91RfTRdSYZDsgenGCanrcpE+Wi+YEozIan9pC47xhBxzzIL9a3AUVllNIGc4qNfs9Al0M/r+kl+ndk+I2k6QFNr4aIjR/qsk52YjW/ZqmORbe2MoI4bIFS3FwlWRoYhJC78yLXOfghvl3xHJiq0Uir2vxmYdXYXfaY82g7ZtThaSqc63WZcD5CaV1Wy6jfqB1sHwuJsADE6BXPQKFfZ9t8tKE3b58rB47TFTmJb8TETgG/xK6pbaEo/Z7iWjFhJKTrcnnF4PynrJab6kw+pnU08u7/je9ZhDEf+jvK3XnqwC+A8XEktywihnrskQ7Eo9Wdmzuw9ujbY8EwQxIFK+TPpgQ8dv25aXPXspnPgiH+2lt19ok1oRIZTenv2KLXqE3wrvmXQIEbdAHFHXsTLj781/9iNdc8ta645V3ktqvz35s1c8Gr+ZbZIK5WRlrJ8TO1WcokSDK7H8hqY6CbT1QC3oFxr5pVPoqZzBMOR6g5MOPbR41XtcHlQopCKC6XeGAVd4dIuCx1CT4vqG+8RgOABxhrEeLmsHGFpBnwPtlVniZQixmOLSzQWUNoUDWMt2mwrWKb/VmzprnNmN++ybPqXhX8bD+k1NQDb7r5CwPqlzmCypXSNH9kVn0QvpqLT5elQ2295yzasW22c8mEPmSvNPM/rE/tqWJA6vAKbXOy1ktrG/TCbzGV2llAvqQqQPX8zGJrXEzKTYk+mHiIdMKpw1bWJhDUOAjdosi853Lbt2GuUjiVNMGJBXPcLLvmjjvv9oLcSYHBTuIfOkScLKKGUhabzHFPmdxgF1MB0zvVO22ooxhmhvCmq+dlag71bbP5RvTjHf50BzJZ5+ysGyM7FJm99BErHo2lTpHSKdSFF0nAlP9Z/Ybf2zTEunlz8RdmQgsq+0F+kwkxI7SqGTy0SAJbbgawNoNTptdyO33a41zprKd/3Wnp7kfoTOfmjVYdHPVFC1GywMER7ordLV3XpjrjX6R6JTd2eOZajcBCsEc+gzVqg/nR6t5y8jfS8NfzfdCMsRzEqz6vuy+M66zNIEocZiF9Tkm1r8MLwaUCE7QfEXexqkChAk9jaOzcojyOfAlXIxvVMn6yFF1gmmQtgudxsY7I/0ZjdSZlBgBFcPFT6OT+HTZ7cCAVF7J7GsGlVzwrUpqcQzSt9z3QrA0iTd4DUXgsWmFIgcdhWbPFlkaPKyZ+QXxrz2VYKCuzDWi3wzLaioFnHxLXZDt6Puo5mPiRTzSolu3fH4S31yVJ7E6e2n8zwUmnFiZ10TrrkO64b9B3TwLx1mLPap7F39DAnufj7XF4eKCdvGJEKVGc+SsyrElzKimsR4Zs9H/Jw+KOCWc/O9l8yFAc42EXUGWrq9L+B6NIaZ7hDY/sDHI748wyFPeUHhOa99BnR15Sr+IrXBG3tsXbyMgHv+gS66Nkmkllvwjpi5Q/7vJOrxrKyFS1KGl5+6N/PXj1Tn5SqWMN8Wj2mniEGD9zSaLy7DUCxmKYA9Dn3/8WQdY8yWmOyi+SFyrL6VgQ8sUQ5MNnVPhQevxB3ZQSTItofT0sE0Xv7yEYkc/T4HGVsvDRKz6RZwaZvZEg')); - payload = encryptor.decryptMessageContent(publishParameters[2].toString()); expect(payload).toEqual(expectedErrorPayload.error); @@ -390,7 +401,7 @@ describe('AMQP', () => { it('Should not provide errorInput if errorInput was empty', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -429,7 +440,7 @@ describe('AMQP', () => { it('Should not provide errorInput if errorInput was null', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -469,7 +480,7 @@ describe('AMQP', () => { it('Should send message to rebounds when rebound happened', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -519,7 +530,7 @@ describe('AMQP', () => { it('Should send message to rebounds with reboundIteration=3', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -572,7 +583,7 @@ describe('AMQP', () => { it('Should send message to errors when rebound limit exceeded', () => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); const props = { contentType: 'application/json', @@ -605,7 +616,6 @@ describe('AMQP', () => { ]); const payload = JSON.parse(publishParameters[2].toString()); - console.log(payload); payload.error = encryptor.decryptMessageContent(payload.error); payload.errorInput = encryptor.decryptMessageContent(payload.errorInput);