Skip to content

Commit

Permalink
Reduse overhead of monitor pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
watson committed Dec 2, 2020
1 parent 492bd38 commit 33e1144
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
29 changes: 22 additions & 7 deletions lib/monitor.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict';

const Os = require('os');
const { pipeline } = require('stream');

const Hoek = require('@hapi/hoek');
const Oppsy = require('@hapi/oppsy');
const Pumpify = require('pumpify');

const Package = require('../package.json');
const Utils = require('./utils');
Expand Down Expand Up @@ -119,15 +119,11 @@ module.exports = internals.Monitor = class {
streamObjs.push(stream);
}

if (streamObjs.length === 1) {
streamObjs.unshift(new Utils.NoOp());
}

this._reporters.set(reporterName, Pumpify.obj(streamObjs)).get(reporterName).on('error', (err) => {
this._reporters.set(reporterName, setupPipeline(streamObjs, (err) => {

console.error(`There was a problem (${err}) in ${reporterName} and it has been destroyed.`);
console.error(err);
});
}));
}

this._state.report = true;
Expand Down Expand Up @@ -208,3 +204,22 @@ module.exports = internals.Monitor = class {
}
}
};

const setupPipeline = (streams, onError) => {

const stream = streams[0];

if (streams.length === 1) {
stream.on('error', onError);
}
else {
pipeline(streams, (err) => {

if (err) {
onError(err);
}
});
}

return stream;
};
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
"dependencies": {
"@hapi/hoek": "9.x.x",
"@hapi/oppsy": "3.x.x",
"@hapi/validate": "1.x.x",
"pumpify": "2.x.x"
"@hapi/validate": "1.x.x"
},
"devDependencies": {
"@hapi/code": "8.x.x",
Expand Down
14 changes: 11 additions & 3 deletions test/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('Monitor', () => {
monitor.stop();
});

it('logs and destroys a reporter in the event of a stream error', { plan: 3 }, () => {
it('logs and destroys a reporter in the event of a stream error', { plan: 3 }, async () => {

const one = new Reporters.Incrementer(1);
const two = new Reporters.Writer(true);
Expand All @@ -114,12 +114,20 @@ describe('Monitor', () => {
// Verion 8 of node misses this change inside monitor, so force it here
const foo = monitor._reporters.get('foo');
foo.destroyed = true;
foo.emit('error');
foo.emit('error', new Error('bar'));
monitor.push(() => ({ id: 3, number: 100 }));

expect(two.data).to.have.length(2);
expect(two.data).to.equal([{ id: 1, number: 3 }, { id: 2, number: 6 }]);
console.error = err;

await new Promise((resolve) => {

process.nextTick(() => {

console.error = err;
resolve();
});
});
});

describe('start()', () => {
Expand Down

0 comments on commit 33e1144

Please sign in to comment.