diff --git a/lib/amqp.js b/lib/amqp.js index f93529cf..197c4662 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -73,7 +73,7 @@ class Amqp { } catch (err) { log.error(err, 'Error occurred while parsing message #%j payload', - message.fields.deliveryTag, + message.fields.deliveryTag ); return self.reject(message); } @@ -125,17 +125,28 @@ class Amqp { async publishMessage(exchangeName, routingKey, payloadBuffer, options, iteration) { const settings = this.settings; - const result = this.publishChannel.publish(exchangeName, routingKey, payloadBuffer, options); - if (!result) { - log.warn('Buffer full when publishing a message to ' - + 'exchange=%s with routingKey=%s', exchangeName, routingKey); + const publishChannel = this.publishChannel; + + function promisifiedPublish(exchangeName, routingKey, payloadBuffer, options) { + let result; + return new Promise((resolve, reject) => { + result = publishChannel.publish(exchangeName, routingKey, payloadBuffer, options, (err, ok) => { + err ? reject(err) : resolve(ok); + }); + }) + .then(() => result); } try { - await this.publishChannel.waitForConfirms(); + const result = await promisifiedPublish(exchangeName, routingKey, payloadBuffer, options); + if (!result) { + log.warn('Buffer full when publishing a message to ' + + 'exchange=%s with routingKey=%s', exchangeName, routingKey); + } + return result; } catch (error) { - log.error('Failed on publishing message to queue'); - await new Promise(resolve => { setTimeout(resolve, settings.AMQP_PUBLISH_RETRY_DELAY); }); + log.error(error, 'Failed on publishing message to queue'); + await new Promise(resolve => setTimeout(resolve, settings.AMQP_PUBLISH_RETRY_DELAY)); iteration += 1; if (iteration < settings.AMQP_PUBLISH_RETRY_ATTEMPTS) { return await this.publishMessage(exchangeName, routingKey, payloadBuffer, options, iteration); @@ -143,8 +154,6 @@ class Amqp { throw new Error(`Failed on publishing ${options.headers.messageId} message to MQ: ` + error); } } - - return result; } async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) { diff --git a/package.json b/package.json index 2bd2b6e7..23b6dc9a 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.6.0", + "version": "2.6.1", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index c472a54b..e98c205c 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -66,10 +66,15 @@ describe('AMQP', () => { spyOn(encryptor, 'decryptMessageContent').andCallThrough(); }); - it('Should send message to outgoing channel when process data', () => { + it('Should send message to outgoing channel when process data', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); - + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', contentEncoding: 'utf8', @@ -85,34 +90,39 @@ describe('AMQP', () => { 'some-other-header': 'headerValue' }, body: 'Message content' - }, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); - - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.DATA_ROUTING_KEY, - jasmine.any(Object), - props - ]); - - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ - headers: { - 'some-other-header': 'headerValue' - }, - body: 'Message content' - }); + }, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + jasmine.any(Object), + props, + jasmine.any(Function) + ]); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); it('Should send message async to outgoing channel when process data', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); amqp.publishChannel.publish = () => true; - spyOn(amqp.publishChannel, 'publish').andReturn(true); - + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', contentEncoding: 'utf8', @@ -125,6 +135,7 @@ describe('AMQP', () => { // One request every 500 ms const throttle = pThrottle(() => Promise.resolve(), 1, 500); const start = Date.now(); + async function test() { for (let i = 0; i < 3; i++) { await amqp.sendData({ @@ -135,6 +146,7 @@ describe('AMQP', () => { }, props, throttle); } } + test().then(() => { const duration = Math.round((Date.now() - start) / 1000); // Total duration should be around 1 seconds, because @@ -150,7 +162,8 @@ describe('AMQP', () => { settings.PUBLISH_MESSAGES_TO, settings.DATA_ROUTING_KEY, jasmine.any(Object), - props + props, + jasmine.any(Function) ]); const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); @@ -161,18 +174,18 @@ describe('AMQP', () => { body: 'Message content' }); done(); - }, done); + }, () => done(new Error('Exception should not be thrown'))); }); - it('Should throw error when message size exceeds limit', (done) => { + it('Should throw error when message size exceeds limit', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'on']); const body = 'a'.repeat(settings.OUTGOING_MESSAGE_SIZE_LIMIT + 1); const headers = {}; amqp.sendData({ body }, { headers }) - .then(() => done.fail('Exception should be thrown')) + .then(() => done(new Error('Exception should not be thrown'))) .catch(err => { expect(err.message).toEqual('Outgoing message size 13981056 exceeds limit of 10485760.'); expect(amqp.publishChannel.publish).not.toHaveBeenCalled(); @@ -180,9 +193,115 @@ describe('AMQP', () => { }); }); - it('Should sendHttpReply to outgoing channel using routing key from headers when process data', () => { + it('Should send message to outgoing channel after 3 attempts', done => { + const amqp = new Amqp(settings); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + let iteration = 0; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + iteration < 3 ? cb('Some error') : cb(null, 'Success'); + iteration++; + return true; + }); + + const props = { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + taskId: 'task1234567890', + stepId: 'step_456' + } + }; + + amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(4); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + jasmine.any(Object), + props, + jasmine.any(Function) + ]); + + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }); + done(); + }, () => done(new Error('Exception should not be thrown'))); + }); + + it('Should throw error after 3 attempts to publish message', done => { + const amqp = new Amqp(settings); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => cb('Some error')); + + const props = { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + taskId: 'task1234567890', + stepId: 'step_456' + } + }; + + amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, props) + .then(() => done(new Error('Exception should thrown'))) + .catch(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(10); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + jasmine.any(Object), + props, + jasmine.any(Function) + ]); + + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }); + done(); + }); + }); + + it('Should sendHttpReply to outgoing channel using routing key from headers when process data', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const msg = { statusCode: 200, @@ -202,24 +321,32 @@ describe('AMQP', () => { reply_to: 'my-special-routing-key' } }; - amqp.sendHttpReply(msg, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); - - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('my-special-routing-key'); - expect(publishParameters[2].toString()).toEqual(encryptor.encryptMessageContent(msg)); - expect(publishParameters[3]).toEqual(props); - - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual(msg); + amqp.sendHttpReply(msg, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('my-special-routing-key'); + expect(publishParameters[2].toString()).toEqual(encryptor.encryptMessageContent(msg)); + expect(publishParameters[3]).toEqual(props); + + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual(msg); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); it('Should throw error in sendHttpReply if reply_to header not found', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const msg = { statusCode: 200, @@ -228,6 +355,7 @@ describe('AMQP', () => { }, body: 'OK' }; + async function test() { await amqp.sendHttpReply(msg, { contentType: 'application/json', @@ -239,16 +367,23 @@ describe('AMQP', () => { } }); } - test().then(() => done(new Error('should throw')), () => { + + test().then(() => done(new Error('Exception should be thrown')), () => { expect(amqp.publishChannel.publish).not.toHaveBeenCalled(); done(); }); }); - it('Should send message to outgoing channel using routing key from headers when process data', () => { + it('Should send message to outgoing channel using routing key from headers when process data', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const msg = { headers: { @@ -269,31 +404,41 @@ describe('AMQP', () => { } }; - amqp.sendData(msg, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); - - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - 'my-special-routing-key', - jasmine.any(Object), - props - ]); + amqp.sendData(msg, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + 'my-special-routing-key', + jasmine.any(Object), + props, + jasmine.any(Function) + ]); + + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: {}, + body: { + content: 'Message content' + } + }); + done(); + }, () => done(new Error('Exception should not be thrown'))); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ - headers: {}, - body: { - content: 'Message content' - } - }); }); - it('Should send message to errors when process error', () => { + it('Should send message to errors when process error', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -305,36 +450,40 @@ describe('AMQP', () => { } }; - amqp.sendError(new Error('Test error'), props, message.content); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); - - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.ERROR_ROUTING_KEY, - jasmine.any(Object), - props - ]); - - const payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); - payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); - - expect(payload).toEqual({ - error: { - name: 'Error', - message: 'Test error', - stack: jasmine.any(String) - }, - errorInput: { - content: 'Message content' - } - }); + amqp.sendError(new Error('Test error'), props, message.content) + .then(() => { + + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.ERROR_ROUTING_KEY, + jasmine.any(Object), + props, + jasmine.any(Function) + ]); + + const payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); + payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); + + expect(payload).toEqual({ + error: { + name: 'Error', + message: 'Test error', + stack: jasmine.any(String) + }, + errorInput: { + content: 'Message content' + } + }); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); - it('Should send message to errors using routing key from headers when process error', async () => { + it('Should send message to errors using routing key from headers when process error', done => { const expectedErrorPayload = { error: { name: 'Error', @@ -347,7 +496,13 @@ describe('AMQP', () => { }; const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -360,48 +515,56 @@ describe('AMQP', () => { } }; - await amqp.sendError(new Error('Test error'), props, message.content); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(2); + amqp.sendError(new Error('Test error'), props, message.content) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(2); - let publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters.length).toEqual(4); - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); - expect(publishParameters[3]).toEqual(props); + let publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters.length).toEqual(5); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); + expect(publishParameters[3]).toEqual(props); - let payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); - payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); + let payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); + payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); - expect(payload).toEqual(expectedErrorPayload); + expect(payload).toEqual(expectedErrorPayload); - publishParameters = amqp.publishChannel.publish.calls[1].args; - expect(publishParameters.length).toEqual(4); - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('my-special-routing-key'); - expect(publishParameters[3]).toEqual({ - contentType: 'application/json', - contentEncoding: 'utf8', - mandatory: true, - headers: { - 'taskId': 'task1234567890', - 'stepId': 'step_456', - 'reply_to': 'my-special-routing-key', - 'x-eio-error-response': true - } - }); + publishParameters = amqp.publishChannel.publish.calls[1].args; + expect(publishParameters.length).toEqual(5); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('my-special-routing-key'); + expect(publishParameters[3]).toEqual({ + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + 'taskId': 'task1234567890', + 'stepId': 'step_456', + 'reply_to': 'my-special-routing-key', + 'x-eio-error-response': true + } + }); - payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual(expectedErrorPayload.error); + expect(payload).toEqual(expectedErrorPayload.error); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); - it('Should not provide errorInput if errorInput was empty', () => { + it('Should not provide errorInput if errorInput was empty', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -413,34 +576,42 @@ describe('AMQP', () => { } }; - amqp.sendError(new Error('Test error'), props, ''); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + amqp.sendError(new Error('Test error'), props, '') + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); - const payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); + const payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); - expect(payload).toEqual({ - error: { - name: 'Error', - message: 'Test error', - stack: jasmine.any(String) - } - // no errorInput should be here - }); + expect(payload).toEqual({ + error: { + name: 'Error', + message: 'Test error', + stack: jasmine.any(String) + } + // no errorInput should be here + }); - expect(publishParameters[3]).toEqual(props); + expect(publishParameters[3]).toEqual(props); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); - it('Should not provide errorInput if errorInput was null', () => { + it('Should not provide errorInput if errorInput was null', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -452,35 +623,42 @@ describe('AMQP', () => { } }; - amqp.sendError(new Error('Test error'), props, null); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + amqp.sendError(new Error('Test error'), props, null) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; + const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); - const payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); + const payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); - expect(payload).toEqual({ - error: { - name: 'Error', - message: 'Test error', - stack: jasmine.any(String) - } - // no errorInput should be here - }); - - expect(publishParameters[3]).toEqual(props); + expect(payload).toEqual({ + error: { + name: 'Error', + message: 'Test error', + stack: jasmine.any(String) + } + // no errorInput should be here + }); + expect(publishParameters[3]).toEqual(props); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); - it('Should send message to rebounds when rebound happened', () => { + it('Should send message to rebounds when rebound happened', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -496,41 +674,51 @@ describe('AMQP', () => { } }; - amqp.sendRebound(new Error('Rebound error'), message, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + amqp.sendRebound(new Error('Rebound error'), message, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.REBOUND_ROUTING_KEY, + jasmine.any(Object), + { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + expiration: 15000, + headers: { + execId: 'exec1234567890', + taskId: 'task1234567890', + stepId: 'step_1', + compId: 'comp1', + function: 'list', + start: '1432815685034', + reboundIteration: 1 + } + }, + jasmine.any(Function) + ]); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.REBOUND_ROUTING_KEY, - jasmine.any(Object), - { - contentType: 'application/json', - contentEncoding: 'utf8', - mandatory: true, - expiration: 15000, - headers: { - execId: 'exec1234567890', - taskId: 'task1234567890', - stepId: 'step_1', - compId: 'comp1', - function: 'list', - start: '1432815685034', - reboundIteration: 1 - } - } - ]); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ content: 'Message content' }); + done(); + }, () => done(new Error('Exception should not be thrown'))); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ content: 'Message content' }); }); - it('Should send message to rebounds with reboundIteration=3', () => { + it('Should send message to rebounds with reboundIteration=3', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -549,41 +737,50 @@ describe('AMQP', () => { const clonedMessage = _.cloneDeep(message); clonedMessage.properties.headers.reboundIteration = 2; - amqp.sendRebound(new Error('Rebound error'), clonedMessage, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); - - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.REBOUND_ROUTING_KEY, - jasmine.any(Object), - { - contentType: 'application/json', - contentEncoding: 'utf8', - mandatory: true, - expiration: 60000, - headers: { - execId: 'exec1234567890', - taskId: 'task1234567890', - stepId: 'step_1', - compId: 'comp1', - function: 'list', - start: '1432815685034', - reboundIteration: 3 - } - } - ]); + amqp.sendRebound(new Error('Rebound error'), clonedMessage, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.REBOUND_ROUTING_KEY, + jasmine.any(Object), + { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + expiration: 60000, + headers: { + execId: 'exec1234567890', + taskId: 'task1234567890', + stepId: 'step_1', + compId: 'comp1', + function: 'list', + start: '1432815685034', + reboundIteration: 3 + } + }, + jasmine.any(Function) + ]); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ content: 'Message content' }); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ content: 'Message content' }); + done(); + }, () => done(new Error('Exception should not be thrown'))); }); - it('Should send message to errors when rebound limit exceeded', () => { + it('Should send message to errors when rebound limit exceeded', done => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish', 'waitForConfirms', 'on']); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['on']); + amqp.publishChannel.publish = () => true; + spyOn(amqp.publishChannel, 'publish') + .andCallFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }); const props = { contentType: 'application/json', @@ -602,28 +799,32 @@ describe('AMQP', () => { const clonedMessage = _.cloneDeep(message); clonedMessage.properties.headers.reboundIteration = 100; - amqp.sendRebound(new Error('Rebound error'), clonedMessage, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); - - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.ERROR_ROUTING_KEY, - jasmine.any(Object), - props - ]); + amqp.sendRebound(new Error('Rebound error'), clonedMessage, props) + .then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.ERROR_ROUTING_KEY, + jasmine.any(Object), + props, + jasmine.any(Function) + ]); + + const payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); + payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); + + expect(payload.error.message).toEqual('Rebound limit exceeded'); + expect(payload.errorInput).toEqual({ content: 'Message content' }); + done(); + }, () => done(new Error('Exception should not be thrown'))); - const payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); - payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); - expect(payload.error.message).toEqual('Rebound limit exceeded'); - expect(payload.errorInput).toEqual({ content: 'Message content' }); }); - it('Should ack message when confirmed', () => { const amqp = new Amqp();