From 388cef61e8a4859b7505f7b5cf988eba27ce17b4 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 7 Mar 2020 23:28:09 +0100 Subject: [PATCH] stream: align stream.Duplex with net.Socket stream.Duplex and net.Socket slightly differs in behavior. Especially when it comes to the case where one side never becomes readable or writable. This aligns Duplex with the behavior of Socket. PR-URL: https://github.com/nodejs/node/pull/32139 Reviewed-By: Anto Aravinth Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- lib/_stream_duplex.js | 16 -------- lib/_stream_readable.js | 21 +++++++++- lib/_stream_writable.js | 10 ++++- test/parallel/test-stream-duplex-destroy.js | 44 +++++++++++++++++++++ test/parallel/test-stream-duplex-end.js | 4 +- 5 files changed, 73 insertions(+), 22 deletions(-) diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index b832c973a1ee18..fe2281df471dc7 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -66,7 +66,6 @@ function Duplex(options) { if (options.allowHalfOpen === false) { this.allowHalfOpen = false; - this.once('end', onend); } } } @@ -128,18 +127,3 @@ ObjectDefineProperties(Duplex.prototype, { } } }); - -// The no-half-open enforcer -function onend() { - // If the writable side ended, then we're ok. - if (this._writableState.ended) - return; - - // No more data can be written. - // But allow more writes to happen in this tick. - process.nextTick(onEndNT, this); -} - -function onEndNT(self) { - self.end(); -} diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index dfbd023d24e185..1df50ba200fe0f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1217,17 +1217,34 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.emit('end'); - if (state.autoDestroy) { + if (stream.writable && stream.allowHalfOpen === false) { + process.nextTick(endWritableNT, state, stream); + } else if (state.autoDestroy) { // In case of duplex streams we need a way to detect // if the writable side is ready for autoDestroy as well const wState = stream._writableState; - if (!wState || (wState.autoDestroy && wState.finished)) { + const autoDestroy = !wState || ( + wState.autoDestroy && + // We don't expect the writable to ever 'finish' + // if writable is explicitly set to false. + (wState.finished || wState.writable === false) + ); + + if (autoDestroy) { stream.destroy(); } } } } +function endWritableNT(state, stream) { + const writable = stream.writable && !stream.writableEnded && + !stream.destroyed; + if (writable) { + stream.end(); + } +} + Readable.from = function(iterable, opts) { if (from === undefined) { from = require('internal/streams/from'); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index b24192101c71a3..c3a7a35d2b3f6f 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -675,7 +675,13 @@ function finish(stream, state) { // In case of duplex streams we need a way to detect // if the readable side is ready for autoDestroy as well const rState = stream._readableState; - if (!rState || (rState.autoDestroy && rState.endEmitted)) { + const autoDestroy = !rState || ( + rState.autoDestroy && + // We don't expect the readable to ever 'end' + // if readable is explicitly set to false. + (rState.endEmitted || rState.readable === false) + ); + if (autoDestroy) { stream.destroy(); } } @@ -748,7 +754,7 @@ ObjectDefineProperties(Writable.prototype, { // Compat. The user might manually disable writable side through // deprecated setter. return !!w && w.writable !== false && !w.destroyed && !w.errored && - !w.ending; + !w.ending && !w.ended; }, set(val) { // Backwards compatible. diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 3c38d2c364051c..e7c91ec797beb3 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -194,3 +194,47 @@ const assert = require('assert'); new MyDuplex(); } + +{ + const duplex = new Duplex({ + writable: false, + autoDestroy: true, + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + duplex.push(null); + duplex.resume(); + duplex.on('close', common.mustCall()); +} + +{ + const duplex = new Duplex({ + readable: false, + autoDestroy: true, + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + duplex.end(); + duplex.on('close', common.mustCall()); +} + +{ + const duplex = new Duplex({ + allowHalfOpen: false, + autoDestroy: true, + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + duplex.push(null); + duplex.resume(); + const orgEnd = duplex.end; + duplex.end = common.mustNotCall(); + duplex.on('end', () => { + // Ensure end() is called in next tick to allow + // any pending writes to be invoked first. + process.nextTick(() => { + duplex.end = common.mustCall(orgEnd); + }); + }); + duplex.on('close', common.mustCall()); +} diff --git a/test/parallel/test-stream-duplex-end.js b/test/parallel/test-stream-duplex-end.js index 8ee19346d3abe5..2c7706146eb882 100644 --- a/test/parallel/test-stream-duplex-end.js +++ b/test/parallel/test-stream-duplex-end.js @@ -22,7 +22,7 @@ const Duplex = require('stream').Duplex; }); assert.strictEqual(stream.allowHalfOpen, false); stream.on('finish', common.mustCall()); - assert.strictEqual(stream.listenerCount('end'), 1); + assert.strictEqual(stream.listenerCount('end'), 0); stream.resume(); stream.push(null); } @@ -35,7 +35,7 @@ const Duplex = require('stream').Duplex; assert.strictEqual(stream.allowHalfOpen, false); stream._writableState.ended = true; stream.on('finish', common.mustNotCall()); - assert.strictEqual(stream.listenerCount('end'), 1); + assert.strictEqual(stream.listenerCount('end'), 0); stream.resume(); stream.push(null); }