From 33e1144524be6c4e6dcd28a9f868d428d7af7c13 Mon Sep 17 00:00:00 2001 From: Thomas Watson Date: Mon, 30 Nov 2020 17:51:03 +0100 Subject: [PATCH] Reduse overhead of monitor pipeline --- lib/monitor.js | 29 ++++++++++++++++++++++------- package.json | 3 +-- test/monitor.js | 14 +++++++++++--- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index ab16cbd..c815655 100755 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,10 +1,10 @@ 'use strict'; const Os = require('os'); +const { pipeline } = require('stream'); const Hoek = require('@hapi/hoek'); const Oppsy = require('@hapi/oppsy'); -const Pumpify = require('pumpify'); const Package = require('../package.json'); const Utils = require('./utils'); @@ -119,15 +119,11 @@ module.exports = internals.Monitor = class { streamObjs.push(stream); } - if (streamObjs.length === 1) { - streamObjs.unshift(new Utils.NoOp()); - } - - this._reporters.set(reporterName, Pumpify.obj(streamObjs)).get(reporterName).on('error', (err) => { + this._reporters.set(reporterName, setupPipeline(streamObjs, (err) => { console.error(`There was a problem (${err}) in ${reporterName} and it has been destroyed.`); console.error(err); - }); + })); } this._state.report = true; @@ -208,3 +204,22 @@ module.exports = internals.Monitor = class { } } }; + +const setupPipeline = (streams, onError) => { + + const stream = streams[0]; + + if (streams.length === 1) { + stream.on('error', onError); + } + else { + pipeline(streams, (err) => { + + if (err) { + onError(err); + } + }); + } + + return stream; +}; diff --git a/package.json b/package.json index 3544f88..eb7758e 100644 --- a/package.json +++ b/package.json @@ -18,8 +18,7 @@ "dependencies": { "@hapi/hoek": "9.x.x", "@hapi/oppsy": "3.x.x", - "@hapi/validate": "1.x.x", - "pumpify": "2.x.x" + "@hapi/validate": "1.x.x" }, "devDependencies": { "@hapi/code": "8.x.x", diff --git a/test/monitor.js b/test/monitor.js index de0536d..54a5cc6 100755 --- a/test/monitor.js +++ b/test/monitor.js @@ -91,7 +91,7 @@ describe('Monitor', () => { monitor.stop(); }); - it('logs and destroys a reporter in the event of a stream error', { plan: 3 }, () => { + it('logs and destroys a reporter in the event of a stream error', { plan: 3 }, async () => { const one = new Reporters.Incrementer(1); const two = new Reporters.Writer(true); @@ -114,12 +114,20 @@ describe('Monitor', () => { // Verion 8 of node misses this change inside monitor, so force it here const foo = monitor._reporters.get('foo'); foo.destroyed = true; - foo.emit('error'); + foo.emit('error', new Error('bar')); monitor.push(() => ({ id: 3, number: 100 })); expect(two.data).to.have.length(2); expect(two.data).to.equal([{ id: 1, number: 3 }, { id: 2, number: 6 }]); - console.error = err; + + await new Promise((resolve) => { + + process.nextTick(() => { + + console.error = err; + resolve(); + }); + }); }); describe('start()', () => {