Skip to content

Commit

Permalink
Reduce overhead of monitor pipeline (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
watson authored Dec 2, 2020
1 parent 1e87fb1 commit 1bf8478
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 30 deletions.
30 changes: 21 additions & 9 deletions lib/monitor.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict';

const Os = require('os');
const { pipeline, finished } = require('stream');

const Hoek = require('@hapi/hoek');
const Oppsy = require('@hapi/oppsy');
const Pumpify = require('pumpify');

const Package = require('../package.json');
const Utils = require('./utils');
Expand Down Expand Up @@ -119,15 +119,13 @@ module.exports = internals.Monitor = class {
streamObjs.push(stream);
}

if (streamObjs.length === 1) {
streamObjs.unshift(new Utils.NoOp());
}

this._reporters.set(reporterName, Pumpify.obj(streamObjs)).get(reporterName).on('error', (err) => {
this._reporters.set(reporterName, setupPipeline(streamObjs, (err) => {

console.error(`There was a problem (${err}) in ${reporterName} and it has been destroyed.`);
console.error(err);
});
if (err) {
console.error(`There was a problem (${err}) in ${reporterName} and it has been destroyed.`);
console.error(err);
}
}));
}

this._state.report = true;
Expand Down Expand Up @@ -208,3 +206,17 @@ module.exports = internals.Monitor = class {
}
}
};

const setupPipeline = (streams, onFinished) => {

const stream = streams[0];

if (streams.length === 1) {
finished(stream, onFinished);
}
else {
pipeline(streams, onFinished);
}

return stream;
};
16 changes: 0 additions & 16 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
'use strict';

const Stream = require('stream');

const Hoek = require('@hapi/hoek');


Expand Down Expand Up @@ -168,17 +166,3 @@ exports.RequestLog = class {
}
}
};


exports.NoOp = class extends Stream.Transform {

constructor() {

super({ objectMode: true });
}

_transform(value, encoding, callback) {

callback(null, value);
}
};
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
"dependencies": {
"@hapi/hoek": "9.x.x",
"@hapi/oppsy": "3.x.x",
"@hapi/validate": "1.x.x",
"pumpify": "2.x.x"
"@hapi/validate": "1.x.x"
},
"devDependencies": {
"@hapi/code": "8.x.x",
Expand Down
14 changes: 11 additions & 3 deletions test/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('Monitor', () => {
monitor.stop();
});

it('logs and destroys a reporter in the event of a stream error', { plan: 3 }, () => {
it('logs and destroys a reporter in the event of a stream error', { plan: 3 }, async () => {

const one = new Reporters.Incrementer(1);
const two = new Reporters.Writer(true);
Expand All @@ -114,12 +114,20 @@ describe('Monitor', () => {
// Verion 8 of node misses this change inside monitor, so force it here
const foo = monitor._reporters.get('foo');
foo.destroyed = true;
foo.emit('error');
foo.emit('error', new Error('bar'));
monitor.push(() => ({ id: 3, number: 100 }));

expect(two.data).to.have.length(2);
expect(two.data).to.equal([{ id: 1, number: 3 }, { id: 2, number: 6 }]);
console.error = err;

await new Promise((resolve) => {

process.nextTick(() => {

console.error = err;
resolve();
});
});
});

describe('start()', () => {
Expand Down

0 comments on commit 1bf8478

Please sign in to comment.