Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix writable.end callback behavior #34101

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ Is `true` after [`writable.destroy()`][writable-destroy] has been called.
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/34101
description: The `callback` is invoked before 'finish' or on error.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/29747
description: The `callback` is invoked if 'finish' or 'error' is emitted.
Expand All @@ -428,15 +431,13 @@ changes:
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding if `chunk` is a string
* `callback` {Function} Optional callback for when the stream finishes
or errors
* `callback` {Function} Callback for when the stream is finished.
mcollina marked this conversation as resolved.
Show resolved Hide resolved
ronag marked this conversation as resolved.
Show resolved Hide resolved
* Returns: {this}

Calling the `writable.end()` method signals that no more data will be written
to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one
final additional chunk of data to be written immediately before closing the
stream. If provided, the optional `callback` function is attached as a listener
for the [`'finish'`][] and the `'error'` event.
stream.

Calling the [`stream.write()`][stream-write] method after calling
[`stream.end()`][stream-end] will raise an error.
Expand Down Expand Up @@ -592,7 +593,7 @@ changes:
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string. **Default:** `'utf8'`
* `callback` {Function} Callback for when this chunk of data is flushed
* `callback` {Function} Callback for when this chunk of data is flushed.
* Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write
additional data; otherwise `true`.
Expand Down
53 changes: 25 additions & 28 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream);

function nop() {}

const kOnFinished = Symbol('kOnFinished');

function WritableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
Expand Down Expand Up @@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex) {
// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;

this[kOnFinished] = [];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible optimization to start this as null.

}

function resetBuffer(state) {
Expand Down Expand Up @@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb) {
// not enabled. Passing `er` here doesn't make sense since
// it's related to one specific write, not to the buffered
// writes.
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
errorBuffer(state);
// This can emit error, but error must always follow cb.
errorOrDestroy(stream, er);
}
Expand Down Expand Up @@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb) {
}

if (state.destroyed) {
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
errorBuffer(state);
}

finishMaybe(stream, state);
}

// If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state, err) {
function errorBuffer(state) {
if (state.writing) {
return;
}
Expand All @@ -503,7 +507,11 @@ function errorBuffer(state, err) {
const { chunk, callback } = state.buffered[n];
const len = state.objectMode ? 1 : chunk.length;
state.length -= len;
callback(err);
callback(new ERR_STREAM_DESTROYED('write'));
}

for (const callback of state[kOnFinished].splice(0)) {
callback(new ERR_STREAM_DESTROYED('end'));
}

resetBuffer(state);
Expand Down Expand Up @@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}

if (typeof cb === 'function') {
if (err || state.finished)
if (err || state.finished) {
process.nextTick(cb, err);
else
onFinished(this, cb);
} else {
state[kOnFinished].push(cb);
}
}

return this;
Expand All @@ -636,6 +645,9 @@ function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
for (const callback of state[kOnFinished].splice(0)) {
callback(err);
}
errorOrDestroy(stream, err, state.sync);
} else if (needFinish(state)) {
state.prefinished = true;
Expand Down Expand Up @@ -683,6 +695,11 @@ function finish(stream, state) {
return;

state.finished = true;

for (const callback of state[kOnFinished].splice(0)) {
callback();
}

stream.emit('finish');

if (state.autoDestroy) {
Expand All @@ -701,26 +718,6 @@ function finish(stream, state) {
}
}

// TODO(ronag): Avoid using events to implement internal logic.
function onFinished(stream, cb) {
function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb(err);
if (stream.listenerCount('error') === 0) {
stream.emit('error', err);
}
}

function onfinish() {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb();
}
stream.on('finish', onfinish);
stream.prependListener('error', onerror);
}

ObjectDefineProperties(Writable.prototype, {

destroyed: {
Expand Down Expand Up @@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;
if (!state.destroyed) {
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
process.nextTick(errorBuffer, state);
}
destroy.call(this, err, cb);
return this;
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-stream-transform-final-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const t = new stream.Transform({
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 14);
assert.strictEqual(state, 15);
}, 1));
t.on('end', common.mustCall(function() {
state++;
Expand All @@ -106,5 +106,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 15);
assert.strictEqual(state, 14);
}, 1));
4 changes: 2 additions & 2 deletions test/parallel/test-stream-transform-final.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const t = new stream.Transform({
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 14);
assert.strictEqual(state, 15);
}, 1));
t.on('end', common.mustCall(function() {
state++;
Expand All @@ -108,5 +108,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 15);
assert.strictEqual(state, 14);
}, 1));
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ const assert = require('assert');
assert.strictEqual(err.message, 'asd');
}));
write.end('asd', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
mcollina marked this conversation as resolved.
Show resolved Hide resolved
}));
write.destroy(new Error('asd'));
}
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-stream-writable-end-cb-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ const stream = require('stream');
}));
writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
}

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-end-cb-uncaught.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ writable._final = (cb) => {

writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
2 changes: 1 addition & 1 deletion test/parallel/test-stream-write-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ for (const withPendingData of [ false, true ]) {
w.destroy();
assert.strictEqual(chunksWritten, 1);
callbacks.shift()();
assert.strictEqual(chunksWritten, 2);
assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2);
assert.strictEqual(callbacks.length, 0);
assert.strictEqual(drains, 1);

Expand Down