From 7a7d9bd7506d5c0ea593539632587510b2c26fbc Mon Sep 17 00:00:00 2001 From: yarik ponomarenko Date: Mon, 6 Jan 2020 16:44:51 +0200 Subject: [PATCH] elasticio-2840 not put incoming message into passthrough instead of own data (#125) * implements-elasticio-2840_not_put_incoming_message_into_passthrough_instead_of_own_data * #2840 added support of admiral setting * #2840 fixed tests * #3001 stepID header is not required * #3001 version bump Co-authored-by: Leonid Kuzmin --- lib/sailor.js | 17 +- lib/settings.js | 3 +- mocha_spec/integration_helpers.js | 17 ++ mocha_spec/run.spec.js | 148 ++++++++++++----- package.json | 6 +- spec/.eslintrc.js | 2 +- spec/component_reader.spec.js | 4 +- spec/sailor.spec.js | 264 ++++++++++++++++++++++++------ 8 files changed, 363 insertions(+), 98 deletions(-) diff --git a/lib/sailor.js b/lib/sailor.js index 7f57d4d5..2e9c4791 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -321,11 +321,18 @@ class Sailor { const props = createAmqpProperties(headers, data.id); if (stepData.is_passthrough === true) { - const passthrough = Object.assign({}, _.omit(data, 'passthrough')); - - data.passthrough = Object.assign({}, origPassthrough, { - [self.settings.STEP_ID]: passthrough - }); + if (settings.NO_SELF_PASSTRHOUGH) { + const { stepId } = incomingMessageHeaders; + if (stepId) { + data.passthrough = Object.assign({}, origPassthrough, { + [stepId]: Object.assign({}, _.omit(payload, 'passthrough')) + }); + } + } else { + data.passthrough = Object.assign({}, origPassthrough, { + [self.settings.STEP_ID]: Object.assign({}, _.omit(data, 'passthrough')) + }); + } } try { diff --git a/lib/settings.js b/lib/settings.js index ccc30c6e..aed68474 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -51,7 +51,8 @@ function readFrom(envVars) { PROCESS_AMQP_DRAIN: true, AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms AMQP_PUBLISH_RETRY_ATTEMPTS: 10, - OUTGOING_MESSAGE_SIZE_LIMIT: 10485760 + OUTGOING_MESSAGE_SIZE_LIMIT: 10485760, + NO_SELF_PASSTRHOUGH: false }; if (envVars.ELASTICIO_ADDITIONAL_VARS_FOR_HEADERS) { diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index b458e5ac..2f328c75 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -32,6 +32,22 @@ class AmqpHelper extends EventEmitter { env.ELASTICIO_TIMEOUT = 3000; } + // optional callback `done` is used in order to pass exceptions (e.g. from assertions in tests) to mocha callback + on(event, handler, done = undefined) { + if (!done) { + return super.on(event, handler); + } + + return super.on(event, (...args) => { + try { + handler(...args); + done(); + } catch (e) { + done(e); + } + }); + } + publishMessage(message, { parentMessageId, threadId } = {}, headers = {}) { return this.subscriptionChannel.publish( env.ELASTICIO_LISTEN_MESSAGES_ON, @@ -43,6 +59,7 @@ class AmqpHelper extends EventEmitter { workspaceId: env.ELASTICIO_WORKSPACE_ID, userId: env.ELASTICIO_USER_ID, threadId, + stepId: message.headers.stepId, messageId: parentMessageId }, headers) }); diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 9a7e06eb..a6b9c3e6 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -19,7 +19,9 @@ describe('Integration Test', () => { } ]; const inputMessage = { - headers: {}, + headers: { + stepId: 'step_1' + }, body: { message: 'Just do it!' } @@ -123,8 +125,7 @@ describe('Integration Test', () => { } }); - done(); - }); + }, done); run = requireRun(); @@ -145,7 +146,7 @@ describe('Integration Test', () => { const psMsg = Object.assign(inputMessage, { passthrough: { - step_1: { + step_1: { // emulating an another step – just to be sure that it's not lost id: '34', body: {}, attachments: {} @@ -162,7 +163,7 @@ describe('Integration Test', () => { expect(queueName).to.eql(amqpHelper.nextStepQueue); expect(emittedMessage.passthrough).to.deep.eql({ - step_1: { + step_1: { // emulating an another step – just to be sure that it's not lost id: '34', body: {}, attachments: {} @@ -214,13 +215,94 @@ describe('Integration Test', () => { appId: undefined, clusterId: undefined }); - - done(); - }); + }, done); run = requireRun(); }); + it( + 'should paste data from incoming message into passthrough and not copy own data if NO_SELF_PASSTRHOUGH', + done => { + process.env.ELASTICIO_STEP_ID = 'step_2'; + process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; + process.env.ELASTICIO_FUNCTION = 'emit_data'; + process.env.ELASTICIO_NO_SELF_PASSTRHOUGH = 'true'; + + helpers.mockApiTaskStepResponse({ + is_passthrough: true + }); + + const psMsg = Object.assign(inputMessage, { + passthrough: { + step_oth: { // emulating an another step – just to be sure that it's not lost + id: 'id-56', + body: { a: 1 }, + attachments: {} + } + } + }); + + amqpHelper.publishMessage(psMsg, { + parentMessageId, + threadId + }); + + amqpHelper.on('data', ({ properties, emittedMessage }, queueName) => { + expect(queueName).to.eql(amqpHelper.nextStepQueue); + + expect(emittedMessage.passthrough).to.deep.eql({ + step_oth: { // emulating an another step – just to be sure that it's not lost + id: 'id-56', + body: { a: 1 }, + attachments: {} + }, + step_1: { + headers: inputMessage.headers, + body: inputMessage.body + } + }); + + delete properties.headers.start; + delete properties.headers.end; + delete properties.headers.cid; + + expect(properties.headers).to.deep.equal({ + taskId: env.ELASTICIO_FLOW_ID, + execId: env.ELASTICIO_EXEC_ID, + workspaceId: env.ELASTICIO_WORKSPACE_ID, + containerId: env.ELASTICIO_CONTAINER_ID, + userId: env.ELASTICIO_USER_ID, + threadId, + stepId: env.ELASTICIO_STEP_ID, + compId: env.ELASTICIO_COMP_ID, + function: env.ELASTICIO_FUNCTION, + messageId, + parentMessageId + }); + + delete properties.headers; + + expect(properties).to.deep.eql({ + contentType: 'application/json', + contentEncoding: 'utf8', + deliveryMode: undefined, + priority: undefined, + correlationId: undefined, + replyTo: undefined, + expiration: undefined, + messageId: undefined, + timestamp: undefined, + type: undefined, + userId: undefined, + appId: undefined, + clusterId: undefined + }); + }, done); + + run = requireRun(); + } + ); + it('should work well with async process function emitting data', done => { process.env.ELASTICIO_STEP_ID = 'step_2'; process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; @@ -234,11 +316,15 @@ describe('Integration Test', () => { const psMsg = Object.assign(inputMessage, { passthrough: { - step_1: { - id: '34', + step_oth: { // emulating an another step – just to be sure that it's not lost + id: 'm-34', body: {}, attachments: {} } + }, + headers: { + 'x-custom-component-header': '123_abc', + 'stepId': 'step_1' } }); @@ -254,19 +340,18 @@ describe('Integration Test', () => { expect(queueName).to.eql(amqpHelper.nextStepQueue); expect(emittedMessage.passthrough).to.deep.eql({ - step_1: { - id: '34', + step_oth: { // emulating an another step – just to be sure that it's not lost + id: 'm-34', body: {}, attachments: {} }, - step_2: { - id: messageId, + step_1: { headers: { - 'x-custom-component-header': '123_abc' + 'x-custom-component-header': '123_abc', + 'stepId': 'step_1' }, body: { - id: 'someId', - hai: 'there' + message: 'Just do it!' } } }); @@ -307,7 +392,7 @@ describe('Integration Test', () => { appId: undefined, clusterId: undefined }); - + }, () => { counter++; // We need 10 messages if (counter > 10) { @@ -405,9 +490,7 @@ describe('Integration Test', () => { } } }); - - done(); - }); + }, done); run = requireRun(); @@ -527,8 +610,7 @@ describe('Integration Test', () => { } } }); - done(); - }); + }, done); run = requireRun(); @@ -611,9 +693,7 @@ describe('Integration Test', () => { } } }); - - done(); - }); + }, done); run = requireRun(); @@ -663,9 +743,7 @@ describe('Integration Test', () => { }); expect(hooksDataNock.isDone()).to.be.ok; - - done(); - }); + }, done); run = requireRun(); @@ -738,9 +816,7 @@ describe('Integration Test', () => { } } }); - - done(); - }); + }, done); run = requireRun(); @@ -796,9 +872,7 @@ describe('Integration Test', () => { body: 'Ok', statusCode: 200 }); - - done(); - }); + }, done); run = requireRun(); @@ -831,9 +905,7 @@ describe('Integration Test', () => { compId: env.ELASTICIO_COMP_ID, function: env.ELASTICIO_FUNCTION }); - - done(); - }); + }, done); run = requireRun(); }); diff --git a/package.json b/package.json index fc62f32b..2bd2b6e7 100644 --- a/package.json +++ b/package.json @@ -1,18 +1,18 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.5.4", + "version": "2.6.0", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", "pretest": "npm run lint", "test": "npm run test:jasmine && npm run test:mocha", "test:jasmine": "NODE_ENV=test jasmine-node spec", - "test:mocha": "NODE_ENV=test mocha --recursive mocha_spec", + "test:mocha": "NODE_ENV=test node_modules/.bin/mocha --recursive mocha_spec", "postpublish": "./postpublish.js" }, "engines": { - "node": ">=10.15.0" + "node": ">=12.13.0" }, "dependencies": { "amqplib": "0.5.1", diff --git a/spec/.eslintrc.js b/spec/.eslintrc.js index 4de3c09a..c1d388d6 100644 --- a/spec/.eslintrc.js +++ b/spec/.eslintrc.js @@ -13,4 +13,4 @@ module.exports = { 'parserOptions': { 'ecmaVersion': 8 } -}; \ No newline at end of file +}; diff --git a/spec/component_reader.spec.js b/spec/component_reader.spec.js index cccabe3e..4f5e889b 100644 --- a/spec/component_reader.spec.js +++ b/spec/component_reader.spec.js @@ -89,7 +89,9 @@ describe('Component reader', () => { runs(() => { expect(promise.isRejected()).toEqual(true); var err = promise.inspect().reason; - expect(err.message).toEqual( + const { message } = err; + const [errMessage] = message.split('\n'); + expect(errMessage).toEqual( 'Failed to load file \'./triggers/trigger_with_wrong_dependency.js\': ' + 'Cannot find module \'../not-found-dependency\'' ); diff --git a/spec/sailor.spec.js b/spec/sailor.spec.js index 9ba1d69f..f14756e9 100644 --- a/spec/sailor.spec.js +++ b/spec/sailor.spec.js @@ -35,44 +35,44 @@ describe('Sailor', () => { const encryptor = require('../lib/encryptor.js'); const Sailor = require('../lib/sailor.js').Sailor; const _ = require('lodash'); - const Q = require('q'); const payload = { param1: 'Value1' }; - const message = { - fields: { - consumerTag: 'abcde', - deliveryTag: 12345, - exchange: 'test', - routingKey: 'test.hello' - }, - properties: { - contentType: 'application/json', - contentEncoding: 'utf8', - headers: { - taskId: '5559edd38968ec0736000003', - execId: 'some-exec-id', - userId: '5559edd38968ec0736000002', - workspaceId: '5559edd38968ec073600683' - }, - deliveryMode: undefined, - priority: undefined, - correlationId: undefined, - replyTo: undefined, - expiration: undefined, - messageId: undefined, - timestamp: undefined, - type: undefined, - userId: undefined, - appId: undefined, - mandatory: true, - clusterId: '' - }, - content: Buffer.from(encryptor.encryptMessageContent(payload)) - }; + let message; beforeEach(() => { settings = require('../lib/settings').readFrom(envVars); + message = { + fields: { + consumerTag: 'abcde', + deliveryTag: 12345, + exchange: 'test', + routingKey: 'test.hello' + }, + properties: { + contentType: 'application/json', + contentEncoding: 'utf8', + headers: { + taskId: '5559edd38968ec0736000003', + execId: 'some-exec-id', + userId: '5559edd38968ec0736000002', + workspaceId: '5559edd38968ec073600683' + }, + deliveryMode: undefined, + priority: undefined, + correlationId: undefined, + replyTo: undefined, + expiration: undefined, + messageId: undefined, + timestamp: undefined, + type: undefined, + userId: undefined, + appId: undefined, + mandatory: true, + clusterId: '' + }, + content: Buffer.from(encryptor.encryptMessageContent(payload)) + }; }); describe('init', () => { @@ -186,7 +186,7 @@ describe('Sailor', () => { describe('disconnection', () => { it('should disconnect Mongo and RabbitMQ, and exit process', done => { const fakeAMQPConnection = jasmine.createSpyObj('AMQPConnection', ['disconnect']); - fakeAMQPConnection.disconnect.andReturn(Q.resolve()); + fakeAMQPConnection.disconnect.andReturn(Promise.resolve()); spyOn(amqp, 'Amqp').andReturn(fakeAMQPConnection); spyOn(process, 'exit').andReturn(0); @@ -221,7 +221,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.connect() @@ -283,7 +283,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.connect() @@ -332,7 +332,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.connect() @@ -360,7 +360,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({ is_passthrough: true }); + return Promise.resolve({ is_passthrough: true }); }); const psPayload = { @@ -425,6 +425,161 @@ describe('Sailor', () => { }) .catch(done); //todo: use done.fail after migration to Jasmine 2.x }); + + it( + 'should augment emitted message with passthrough with data from incoming message ' + + 'if NO_SELF_PASSTRHOUGH set', + done => { + message.properties.headers.stepId = 'step_0'; + settings.FUNCTION = 'passthrough'; + settings.NO_SELF_PASSTRHOUGH = true; + const sailor = new Sailor(settings); + + spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { + expect(taskId).toEqual('5559edd38968ec0736000003'); + expect(stepId).toEqual('step_1'); + return Promise.resolve({ is_passthrough: true }); + }); + + const psPayload = { + body: payload, + passthrough: { + step_oth: { + body: { key: 'value' } + } + } + }; + + sailor.connect() + .then(() => sailor.prepare()) + .then(() => sailor.processMessage(psPayload, message)) + .then(() => { + expect(sailor.apiClient.tasks.retrieveStep).toHaveBeenCalled(); + expect(fakeAMQPConnection.connect).toHaveBeenCalled(); + expect(fakeAMQPConnection.sendData).toHaveBeenCalled(); + + const sendDataCalls = fakeAMQPConnection.sendData.calls; + + expect(sendDataCalls[0].args[0]).toEqual({ + body: { + param1: 'Value1' + }, + passthrough: { + step_oth: { + body: { + key: 'value' + } + }, + step_0: { + body: { param1: 'Value1' } + } + } + }); + expect(sendDataCalls[0].args[1]).toEqual(jasmine.any(Object)); + expect(sendDataCalls[0].args[1]).toEqual({ + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + execId: 'some-exec-id', + taskId: '5559edd38968ec0736000003', + userId: '5559edd38968ec0736000002', + containerId: 'dc1c8c3f-f9cb-49e1-a6b8-716af9e15948', + workspaceId: '5559edd38968ec073600683', + stepId: 'step_1', + compId: '5559edd38968ec0736000456', + function: 'passthrough', + start: jasmine.any(Number), + cid: 1, + end: jasmine.any(Number), + messageId: jasmine.any(String) + } + }); + + expect(fakeAMQPConnection.ack).toHaveBeenCalled(); + expect(fakeAMQPConnection.ack.callCount).toEqual(1); + expect(fakeAMQPConnection.ack.calls[0].args[0]).toEqual(message); + done(); + }) + .catch(done); //todo: use done.fail after migration to Jasmine 2.x + } + ); + + it( + 'should not augment emitted message with passthrough with data from incoming message ' + + 'if NO_SELF_PASSTRHOUGH set without stepId header', + done => { + settings.FUNCTION = 'passthrough'; + settings.NO_SELF_PASSTRHOUGH = true; + const sailor = new Sailor(settings); + + spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { + expect(taskId).toEqual('5559edd38968ec0736000003'); + expect(stepId).toEqual('step_1'); + return Promise.resolve({ is_passthrough: true }); + }); + + const psPayload = { + body: payload, + passthrough: { + step_oth: { + body: { key: 'value' } + } + } + }; + + sailor.connect() + .then(() => sailor.prepare()) + .then(() => sailor.processMessage(psPayload, message)) + .then(() => { + expect(sailor.apiClient.tasks.retrieveStep).toHaveBeenCalled(); + expect(fakeAMQPConnection.connect).toHaveBeenCalled(); + expect(fakeAMQPConnection.sendData).toHaveBeenCalled(); + + const sendDataCalls = fakeAMQPConnection.sendData.calls; + + expect(sendDataCalls[0].args[0]).toEqual({ + body: { + param1: 'Value1' + }, + passthrough: { + step_oth: { + body: { + key: 'value' + } + } + } + }); + expect(sendDataCalls[0].args[1]).toEqual(jasmine.any(Object)); + expect(sendDataCalls[0].args[1]).toEqual({ + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + execId: 'some-exec-id', + taskId: '5559edd38968ec0736000003', + userId: '5559edd38968ec0736000002', + containerId: 'dc1c8c3f-f9cb-49e1-a6b8-716af9e15948', + workspaceId: '5559edd38968ec073600683', + stepId: 'step_1', + compId: '5559edd38968ec0736000456', + function: 'passthrough', + start: jasmine.any(Number), + cid: 1, + end: jasmine.any(Number), + messageId: jasmine.any(String) + } + }); + + expect(fakeAMQPConnection.ack).toHaveBeenCalled(); + expect(fakeAMQPConnection.ack.callCount).toEqual(1); + expect(fakeAMQPConnection.ack.calls[0].args[0]).toEqual(message); + done(); + }) + .catch(done); //todo: use done.fail after migration to Jasmine 2.x + } + ); + it('should provide access to flow vairables', done => { settings.FUNCTION = 'use_flow_variables'; const sailor = new Sailor(settings); @@ -432,7 +587,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({ + return Promise.resolve({ is_passthrough: true, variables: { var1: 'val1', @@ -469,7 +624,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({ + return Promise.resolve({ config: { _account: '1234567890' } @@ -479,7 +634,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.accounts, 'update').andCallFake((accountId, keys) => { expect(accountId).toEqual('1234567890'); expect(keys).toEqual({ keys: { oauth: { access_token: 'newAccessToken' } } }); - return Q(); + return Promise.resolve(); }); sailor.prepare() @@ -545,7 +700,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -575,7 +730,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -629,7 +784,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({ + return Promise.resolve({ snapshot: { someId: 'someData' } @@ -687,7 +842,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -718,7 +873,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -747,7 +902,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -785,7 +940,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -806,7 +961,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({}); + return Promise.resolve({}); }); sailor.prepare() @@ -992,7 +1147,9 @@ describe('Sailor', () => { try { sailor.readIncomingMessageHeaders({ properties: { - headers: {} + headers: { + stepId: 'step_0' + } } }); throw new Error('Must not be reached'); @@ -1008,6 +1165,7 @@ describe('Sailor', () => { sailor.readIncomingMessageHeaders({ properties: { headers: { + stepId: 'step_0', execId: 'my_exec_123' } } @@ -1026,6 +1184,7 @@ describe('Sailor', () => { properties: { headers: { execId: 'my_exec_123', + stepId: 'step_0', taskId: 'my_task_123' } } @@ -1045,6 +1204,7 @@ describe('Sailor', () => { headers: { execId: 'my_exec_123', taskId: 'my_task_123', + stepId: 'step_0', userId: 'my_user_123' } } @@ -1061,6 +1221,7 @@ describe('Sailor', () => { const headers = { execId: 'my_exec_123', taskId: settings.FLOW_ID, + stepId: settings.STEP_ID, userId: 'my_user_123' }; @@ -1081,6 +1242,7 @@ describe('Sailor', () => { const headers = { execId: 'my_exec_123', taskId: settings.FLOW_ID, + stepId: settings.STEP_ID, userId: 'my_user_123', messageId }; @@ -1094,6 +1256,7 @@ describe('Sailor', () => { expect(result).toEqual({ execId: 'my_exec_123', taskId: settings.FLOW_ID, + stepId: settings.STEP_ID, userId: 'my_user_123', parentMessageId: messageId }); @@ -1105,6 +1268,7 @@ describe('Sailor', () => { const headers = { execId: 'my_exec_123', taskId: settings.FLOW_ID, + stepId: settings.STEP_ID, userId: 'my_user_123', reply_to: 'my_reply_to_exchange' }; @@ -1124,6 +1288,7 @@ describe('Sailor', () => { const headers = { 'execId': 'my_exec_123', 'taskId': settings.FLOW_ID, + 'stepId': settings.STEP_ID, 'userId': 'my_user_123', 'reply_to': 'my_reply_to_exchange', 'x-eio-meta-lowercase': 'I am lowercase', @@ -1139,6 +1304,7 @@ describe('Sailor', () => { expect(result).toEqual({ 'execId': 'my_exec_123', 'taskId': settings.FLOW_ID, + 'stepId': settings.STEP_ID, 'userId': 'my_user_123', 'reply_to': 'my_reply_to_exchange', 'x-eio-meta-lowercase': 'I am lowercase',