Skip to content

Commit

Permalink
stream: align stream.Duplex with net.Socket
Browse files Browse the repository at this point in the history
stream.Duplex and net.Socket slightly differs in behavior.

Especially when it comes to the case where one side never
becomes readable or writable. This aligns Duplex with the
behavior of Socket.

PR-URL: #32139
Reviewed-By: Anto Aravinth <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag committed Mar 25, 2020
1 parent 05f1df5 commit 388cef6
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 22 deletions.
16 changes: 0 additions & 16 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ function Duplex(options) {

if (options.allowHalfOpen === false) {
this.allowHalfOpen = false;
this.once('end', onend);
}
}
}
Expand Down Expand Up @@ -128,18 +127,3 @@ ObjectDefineProperties(Duplex.prototype, {
}
}
});

// The no-half-open enforcer
function onend() {
// If the writable side ended, then we're ok.
if (this._writableState.ended)
return;

// No more data can be written.
// But allow more writes to happen in this tick.
process.nextTick(onEndNT, this);
}

function onEndNT(self) {
self.end();
}
21 changes: 19 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1217,17 +1217,34 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.emit('end');

if (state.autoDestroy) {
if (stream.writable && stream.allowHalfOpen === false) {
process.nextTick(endWritableNT, state, stream);
} else if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
const wState = stream._writableState;
if (!wState || (wState.autoDestroy && wState.finished)) {
const autoDestroy = !wState || (
wState.autoDestroy &&
// We don't expect the writable to ever 'finish'
// if writable is explicitly set to false.
(wState.finished || wState.writable === false)
);

if (autoDestroy) {
stream.destroy();
}
}
}
}

function endWritableNT(state, stream) {
const writable = stream.writable && !stream.writableEnded &&
!stream.destroyed;
if (writable) {
stream.end();
}
}

Readable.from = function(iterable, opts) {
if (from === undefined) {
from = require('internal/streams/from');
Expand Down
10 changes: 8 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,13 @@ function finish(stream, state) {
// In case of duplex streams we need a way to detect
// if the readable side is ready for autoDestroy as well
const rState = stream._readableState;
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
const autoDestroy = !rState || (
rState.autoDestroy &&
// We don't expect the readable to ever 'end'
// if readable is explicitly set to false.
(rState.endEmitted || rState.readable === false)
);
if (autoDestroy) {
stream.destroy();
}
}
Expand Down Expand Up @@ -748,7 +754,7 @@ ObjectDefineProperties(Writable.prototype, {
// Compat. The user might manually disable writable side through
// deprecated setter.
return !!w && w.writable !== false && !w.destroyed && !w.errored &&
!w.ending;
!w.ending && !w.ended;
},
set(val) {
// Backwards compatible.
Expand Down
44 changes: 44 additions & 0 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,47 @@ const assert = require('assert');

new MyDuplex();
}

{
const duplex = new Duplex({
writable: false,
autoDestroy: true,
write(chunk, enc, cb) { cb(); },
read() {},
});
duplex.push(null);
duplex.resume();
duplex.on('close', common.mustCall());
}

{
const duplex = new Duplex({
readable: false,
autoDestroy: true,
write(chunk, enc, cb) { cb(); },
read() {},
});
duplex.end();
duplex.on('close', common.mustCall());
}

{
const duplex = new Duplex({
allowHalfOpen: false,
autoDestroy: true,
write(chunk, enc, cb) { cb(); },
read() {},
});
duplex.push(null);
duplex.resume();
const orgEnd = duplex.end;
duplex.end = common.mustNotCall();
duplex.on('end', () => {
// Ensure end() is called in next tick to allow
// any pending writes to be invoked first.
process.nextTick(() => {
duplex.end = common.mustCall(orgEnd);
});
});
duplex.on('close', common.mustCall());
}
4 changes: 2 additions & 2 deletions test/parallel/test-stream-duplex-end.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const Duplex = require('stream').Duplex;
});
assert.strictEqual(stream.allowHalfOpen, false);
stream.on('finish', common.mustCall());
assert.strictEqual(stream.listenerCount('end'), 1);
assert.strictEqual(stream.listenerCount('end'), 0);
stream.resume();
stream.push(null);
}
Expand All @@ -35,7 +35,7 @@ const Duplex = require('stream').Duplex;
assert.strictEqual(stream.allowHalfOpen, false);
stream._writableState.ended = true;
stream.on('finish', common.mustNotCall());
assert.strictEqual(stream.listenerCount('end'), 1);
assert.strictEqual(stream.listenerCount('end'), 0);
stream.resume();
stream.push(null);
}

0 comments on commit 388cef6

Please sign in to comment.