Skip to content

Commit

Permalink
Revert "stream: invoke callback before emitting error always"
Browse files Browse the repository at this point in the history
This reverts commit 3de5eae.

PR-URL: #29741
Reviewed-By: Rich Trott <[email protected]>
Reviewed-By: Jiawen Geng <[email protected]>
Reviewed-By: Colin Ihrig <[email protected]>
  • Loading branch information
richardlau authored and Trott committed Sep 28, 2019
1 parent 35bfe0e commit 95792a7
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 122 deletions.
3 changes: 1 addition & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,7 @@ The `writable.write()` method writes some data to the stream, and calls the
supplied `callback` once the data has been fully handled. If an error
occurs, the `callback` *may or may not* be called with the error as its
first argument. To reliably detect write errors, add a listener for the
`'error'` event. If `callback` is called with an error, it will be called
before the `'error'` event is emitted.
`'error'` event.

The return value is `true` if the internal buffer is less than the
`highWaterMark` configured when the stream was created after admitting `chunk`.
Expand Down
37 changes: 16 additions & 21 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!(options && options.autoDestroy);

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -406,7 +401,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;

if (state.writing || state.corked || state.errored) {
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
Expand All @@ -425,9 +420,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
return ret;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand All @@ -444,11 +437,18 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.sync = false;
}

function onwriteError(stream, state, er, cb) {
function onwriteError(stream, state, sync, er, cb) {
--state.pendingcb;

cb(er);
// This can emit error, but error must always follow cb.
if (sync) {
// Defer the callback if we are being called synchronously
// to avoid piling up things on the stack
process.nextTick(cb, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
}
errorOrDestroy(stream, er);
}

Expand All @@ -465,14 +465,9 @@ function onwrite(stream, er) {
state.length -= state.writelen;
state.writelen = 0;

if (er) {
state.errored = true;
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
if (er)
onwriteError(stream, state, sync, er, cb);
else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

Expand Down Expand Up @@ -627,7 +622,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
function needFinish(state) {
return (state.ending &&
state.length === 0 &&
!state.errored &&
!state.errorEmitted &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
Expand Down
13 changes: 1 addition & 12 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if (w && err) {
w.errored = true;
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
Expand All @@ -54,12 +50,10 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
if (emitClose) {
process.nextTick(emitCloseNT, this);
}
cb(err);
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
Expand Down Expand Up @@ -97,7 +91,6 @@ function undestroy() {

if (w) {
w.destroyed = false;
w.errored = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand All @@ -117,10 +110,6 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState;
const w = stream._writableState;

if (w & err) {
w.errored = true;
}

if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (needError(stream, err))
Expand Down
5 changes: 1 addition & 4 deletions test/parallel/test-http2-reset-flood.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
h2header.writeIntBE(1, 0, 3); // Length: 1
h2header.writeIntBE(i, 5, 4); // Stream ID
// 0x88 = :status: 200
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
process.nextTick(writeRequests);
break;
}
conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
}
}

Expand Down
14 changes: 0 additions & 14 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,6 @@ const assert = require('assert');
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) {
this.destroy(new Error('asd'));
cb();
}
});

write.on('error', common.mustCall());
write.on('finish', common.mustNotCall());
write.end('asd');
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); }
Expand Down
58 changes: 0 additions & 58 deletions test/parallel/test-stream-writable-write-cb-error.js

This file was deleted.

6 changes: 1 addition & 5 deletions test/parallel/test-wrap-js-stream-exceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,4 @@ const socket = new JSStreamWrap(new Duplex({
})
}));

socket.end('foo');
socket.on('error', common.expectsError({
type: Error,
message: 'write EPROTO'
}));
assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
14 changes: 8 additions & 6 deletions test/parallel/test-zlib-write-after-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const zlib = require('zlib');
zlib.gzip('hello', common.mustCall(function(err, out) {
const unzip = zlib.createGunzip();
unzip.close(common.mustCall());

unzip.write(out);
unzip.on('error', common.expectsError({
code: 'ERR_STREAM_DESTROYED',
type: Error
}));
common.expectsError(
() => unzip.write(out),
{
code: 'ERR_STREAM_DESTROYED',
type: Error,
message: 'Cannot call write after a stream was destroyed'
}
);
}));

0 comments on commit 95792a7

Please sign in to comment.