Skip to content

Commit

Permalink
elasticio-2840 not put incoming message into passthrough instead of o…
Browse files Browse the repository at this point in the history
…wn data (#125)

* implements-elasticio-2840_not_put_incoming_message_into_passthrough_instead_of_own_data

* #2840 added support of admiral setting

* #2840 fixed tests

* #3001 stepID header is not required

* #3001 version bump

Co-authored-by: Leonid Kuzmin <[email protected]>
  • Loading branch information
yarikos and zuker committed Jan 6, 2020
1 parent 0bd4add commit 7a7d9bd
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 98 deletions.
17 changes: 12 additions & 5 deletions lib/sailor.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,18 @@ class Sailor {
const props = createAmqpProperties(headers, data.id);

if (stepData.is_passthrough === true) {
const passthrough = Object.assign({}, _.omit(data, 'passthrough'));

data.passthrough = Object.assign({}, origPassthrough, {
[self.settings.STEP_ID]: passthrough
});
if (settings.NO_SELF_PASSTRHOUGH) {
const { stepId } = incomingMessageHeaders;
if (stepId) {
data.passthrough = Object.assign({}, origPassthrough, {
[stepId]: Object.assign({}, _.omit(payload, 'passthrough'))
});
}
} else {
data.passthrough = Object.assign({}, origPassthrough, {
[self.settings.STEP_ID]: Object.assign({}, _.omit(data, 'passthrough'))
});
}
}

try {
Expand Down
3 changes: 2 additions & 1 deletion lib/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ function readFrom(envVars) {
PROCESS_AMQP_DRAIN: true,
AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms
AMQP_PUBLISH_RETRY_ATTEMPTS: 10,
OUTGOING_MESSAGE_SIZE_LIMIT: 10485760
OUTGOING_MESSAGE_SIZE_LIMIT: 10485760,
NO_SELF_PASSTRHOUGH: false
};

if (envVars.ELASTICIO_ADDITIONAL_VARS_FOR_HEADERS) {
Expand Down
17 changes: 17 additions & 0 deletions mocha_spec/integration_helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ class AmqpHelper extends EventEmitter {
env.ELASTICIO_TIMEOUT = 3000;
}

// optional callback `done` is used in order to pass exceptions (e.g. from assertions in tests) to mocha callback
on(event, handler, done = undefined) {
if (!done) {
return super.on(event, handler);
}

return super.on(event, (...args) => {
try {
handler(...args);
done();
} catch (e) {
done(e);
}
});
}

publishMessage(message, { parentMessageId, threadId } = {}, headers = {}) {
return this.subscriptionChannel.publish(
env.ELASTICIO_LISTEN_MESSAGES_ON,
Expand All @@ -43,6 +59,7 @@ class AmqpHelper extends EventEmitter {
workspaceId: env.ELASTICIO_WORKSPACE_ID,
userId: env.ELASTICIO_USER_ID,
threadId,
stepId: message.headers.stepId,
messageId: parentMessageId
}, headers)
});
Expand Down
148 changes: 110 additions & 38 deletions mocha_spec/run.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ describe('Integration Test', () => {
}
];
const inputMessage = {
headers: {},
headers: {
stepId: 'step_1'
},
body: {
message: 'Just do it!'
}
Expand Down Expand Up @@ -123,8 +125,7 @@ describe('Integration Test', () => {
}
});

done();
});
}, done);

run = requireRun();

Expand All @@ -145,7 +146,7 @@ describe('Integration Test', () => {

const psMsg = Object.assign(inputMessage, {
passthrough: {
step_1: {
step_1: { // emulating an another step – just to be sure that it's not lost
id: '34',
body: {},
attachments: {}
Expand All @@ -162,7 +163,7 @@ describe('Integration Test', () => {
expect(queueName).to.eql(amqpHelper.nextStepQueue);

expect(emittedMessage.passthrough).to.deep.eql({
step_1: {
step_1: { // emulating an another step – just to be sure that it's not lost
id: '34',
body: {},
attachments: {}
Expand Down Expand Up @@ -214,13 +215,94 @@ describe('Integration Test', () => {
appId: undefined,
clusterId: undefined
});

done();
});
}, done);

run = requireRun();
});

it(
'should paste data from incoming message into passthrough and not copy own data if NO_SELF_PASSTRHOUGH',
done => {
process.env.ELASTICIO_STEP_ID = 'step_2';
process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003';
process.env.ELASTICIO_FUNCTION = 'emit_data';
process.env.ELASTICIO_NO_SELF_PASSTRHOUGH = 'true';

helpers.mockApiTaskStepResponse({
is_passthrough: true
});

const psMsg = Object.assign(inputMessage, {
passthrough: {
step_oth: { // emulating an another step – just to be sure that it's not lost
id: 'id-56',
body: { a: 1 },
attachments: {}
}
}
});

amqpHelper.publishMessage(psMsg, {
parentMessageId,
threadId
});

amqpHelper.on('data', ({ properties, emittedMessage }, queueName) => {
expect(queueName).to.eql(amqpHelper.nextStepQueue);

expect(emittedMessage.passthrough).to.deep.eql({
step_oth: { // emulating an another step – just to be sure that it's not lost
id: 'id-56',
body: { a: 1 },
attachments: {}
},
step_1: {
headers: inputMessage.headers,
body: inputMessage.body
}
});

delete properties.headers.start;
delete properties.headers.end;
delete properties.headers.cid;

expect(properties.headers).to.deep.equal({
taskId: env.ELASTICIO_FLOW_ID,
execId: env.ELASTICIO_EXEC_ID,
workspaceId: env.ELASTICIO_WORKSPACE_ID,
containerId: env.ELASTICIO_CONTAINER_ID,
userId: env.ELASTICIO_USER_ID,
threadId,
stepId: env.ELASTICIO_STEP_ID,
compId: env.ELASTICIO_COMP_ID,
function: env.ELASTICIO_FUNCTION,
messageId,
parentMessageId
});

delete properties.headers;

expect(properties).to.deep.eql({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: undefined,
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined
});
}, done);

run = requireRun();
}
);

it('should work well with async process function emitting data', done => {
process.env.ELASTICIO_STEP_ID = 'step_2';
process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003';
Expand All @@ -234,11 +316,15 @@ describe('Integration Test', () => {

const psMsg = Object.assign(inputMessage, {
passthrough: {
step_1: {
id: '34',
step_oth: { // emulating an another step – just to be sure that it's not lost
id: 'm-34',
body: {},
attachments: {}
}
},
headers: {
'x-custom-component-header': '123_abc',
'stepId': 'step_1'
}
});

Expand All @@ -254,19 +340,18 @@ describe('Integration Test', () => {
expect(queueName).to.eql(amqpHelper.nextStepQueue);

expect(emittedMessage.passthrough).to.deep.eql({
step_1: {
id: '34',
step_oth: { // emulating an another step – just to be sure that it's not lost
id: 'm-34',
body: {},
attachments: {}
},
step_2: {
id: messageId,
step_1: {
headers: {
'x-custom-component-header': '123_abc'
'x-custom-component-header': '123_abc',
'stepId': 'step_1'
},
body: {
id: 'someId',
hai: 'there'
message: 'Just do it!'
}
}
});
Expand Down Expand Up @@ -307,7 +392,7 @@ describe('Integration Test', () => {
appId: undefined,
clusterId: undefined
});

}, () => {
counter++;
// We need 10 messages
if (counter > 10) {
Expand Down Expand Up @@ -405,9 +490,7 @@ describe('Integration Test', () => {
}
}
});

done();
});
}, done);

run = requireRun();

Expand Down Expand Up @@ -527,8 +610,7 @@ describe('Integration Test', () => {
}
}
});
done();
});
}, done);

run = requireRun();

Expand Down Expand Up @@ -611,9 +693,7 @@ describe('Integration Test', () => {
}
}
});

done();
});
}, done);

run = requireRun();

Expand Down Expand Up @@ -663,9 +743,7 @@ describe('Integration Test', () => {
});

expect(hooksDataNock.isDone()).to.be.ok;

done();
});
}, done);

run = requireRun();

Expand Down Expand Up @@ -738,9 +816,7 @@ describe('Integration Test', () => {
}
}
});

done();
});
}, done);

run = requireRun();

Expand Down Expand Up @@ -796,9 +872,7 @@ describe('Integration Test', () => {
body: 'Ok',
statusCode: 200
});

done();
});
}, done);

run = requireRun();

Expand Down Expand Up @@ -831,9 +905,7 @@ describe('Integration Test', () => {
compId: env.ELASTICIO_COMP_ID,
function: env.ELASTICIO_FUNCTION
});

done();
});
}, done);

run = requireRun();
});
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
{
"name": "elasticio-sailor-nodejs",
"description": "The official elastic.io library for bootstrapping and executing for Node.js connectors",
"version": "2.5.4",
"version": "2.6.0",
"main": "run.js",
"scripts": {
"lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js",
"pretest": "npm run lint",
"test": "npm run test:jasmine && npm run test:mocha",
"test:jasmine": "NODE_ENV=test jasmine-node spec",
"test:mocha": "NODE_ENV=test mocha --recursive mocha_spec",
"test:mocha": "NODE_ENV=test node_modules/.bin/mocha --recursive mocha_spec",
"postpublish": "./postpublish.js"
},
"engines": {
"node": ">=10.15.0"
"node": ">=12.13.0"
},
"dependencies": {
"amqplib": "0.5.1",
Expand Down
2 changes: 1 addition & 1 deletion spec/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ module.exports = {
'parserOptions': {
'ecmaVersion': 8
}
};
};
4 changes: 3 additions & 1 deletion spec/component_reader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ describe('Component reader', () => {
runs(() => {
expect(promise.isRejected()).toEqual(true);
var err = promise.inspect().reason;
expect(err.message).toEqual(
const { message } = err;
const [errMessage] = message.split('\n');
expect(errMessage).toEqual(
'Failed to load file \'./triggers/trigger_with_wrong_dependency.js\': '
+ 'Cannot find module \'../not-found-dependency\''
);
Expand Down
Loading

0 comments on commit 7a7d9bd

Please sign in to comment.