-
Notifications
You must be signed in to change notification settings - Fork 29.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: always invoke callback before emitting error #29293
Changes from all commits
9f4a28f
1cd617b
5da8563
14428f3
eb9099b
60dac52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -155,6 +155,11 @@ function WritableState(options, stream, isDuplex) { | |
// Should .destroy() be called after 'finish' (and potentially 'end') | ||
this.autoDestroy = !!options.autoDestroy; | ||
|
||
// Indicates whether the stream has errored. When true all write() calls | ||
// should return false. This is needed since when autoDestroy | ||
// is disabled we need a way to tell whether the stream has failed. | ||
this.errored = false; | ||
|
||
// Count buffered requests | ||
this.bufferedRequestCount = 0; | ||
|
||
|
@@ -394,7 +399,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { | |
if (!ret) | ||
state.needDrain = true; | ||
|
||
if (state.writing || state.corked) { | ||
if (state.writing || state.corked || state.errored) { | ||
var last = state.lastBufferedRequest; | ||
state.lastBufferedRequest = { | ||
chunk, | ||
|
@@ -413,7 +418,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { | |
doWrite(stream, state, false, len, chunk, encoding, cb); | ||
} | ||
|
||
return ret; | ||
// Return false if errored or destroyed in order to break | ||
// any synchronous while(stream.write(data)) loops. | ||
return ret && !state.errored && !state.destroyed; | ||
} | ||
|
||
function doWrite(stream, state, writev, len, chunk, encoding, cb) { | ||
|
@@ -430,18 +437,11 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { | |
state.sync = false; | ||
} | ||
|
||
function onwriteError(stream, state, sync, er, cb) { | ||
function onwriteError(stream, state, er, cb) { | ||
--state.pendingcb; | ||
|
||
if (sync) { | ||
// Defer the callback if we are being called synchronously | ||
// to avoid piling up things on the stack | ||
process.nextTick(cb, er); | ||
} else { | ||
// The caller expect this to happen before if | ||
// it is async | ||
cb(er); | ||
} | ||
cb(er); | ||
// This can emit error, but error must always follow cb. | ||
errorOrDestroy(stream, er); | ||
} | ||
|
||
|
@@ -458,9 +458,14 @@ function onwrite(stream, er) { | |
state.length -= state.writelen; | ||
state.writelen = 0; | ||
|
||
if (er) | ||
onwriteError(stream, state, sync, er, cb); | ||
else { | ||
if (er) { | ||
state.errored = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot use |
||
if (sync) { | ||
process.nextTick(onwriteError, stream, state, er, cb); | ||
} else { | ||
onwriteError(stream, state, er, cb); | ||
} | ||
} else { | ||
// Check if we're actually ready to finish, but don't emit yet | ||
var finished = needFinish(state) || stream.destroyed; | ||
|
||
|
@@ -611,7 +616,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', { | |
function needFinish(state) { | ||
return (state.ending && | ||
state.length === 0 && | ||
!state.errorEmitted && | ||
!state.errored && | ||
state.bufferedRequest === null && | ||
!state.finished && | ||
!state.writing); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,10 @@ function destroy(err, cb) { | |
const r = this._readableState; | ||
const w = this._writableState; | ||
|
||
if (w && err) { | ||
w.errored = true; | ||
} | ||
|
||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
if (cb) { | ||
cb(err); | ||
|
@@ -50,10 +54,12 @@ function destroy(err, cb) { | |
this._destroy(err || null, (err) => { | ||
const emitClose = (w && w.emitClose) || (r && r.emitClose); | ||
if (cb) { | ||
// Invoke callback before scheduling emitClose so that callback | ||
// can schedule before. | ||
cb(err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When passing explicit |
||
if (emitClose) { | ||
process.nextTick(emitCloseNT, this); | ||
} | ||
cb(err); | ||
} else if (needError(this, err)) { | ||
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err); | ||
} else if (emitClose) { | ||
|
@@ -91,6 +97,7 @@ function undestroy() { | |
|
||
if (w) { | ||
w.destroyed = false; | ||
w.errored = false; | ||
w.ended = false; | ||
w.ending = false; | ||
w.finalCalled = false; | ||
|
@@ -110,6 +117,10 @@ function errorOrDestroy(stream, err) { | |
const r = stream._readableState; | ||
const w = stream._writableState; | ||
|
||
if (w & err) { | ||
w.errored = true; | ||
} | ||
|
||
if ((r && r.autoDestroy) || (w && w.autoDestroy)) | ||
stream.destroy(err); | ||
else if (needError(stream, err)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,7 +67,10 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => { | |
h2header.writeIntBE(1, 0, 3); // Length: 1 | ||
h2header.writeIntBE(i, 5, 4); // Stream ID | ||
// 0x88 = :status: 200 | ||
conn.write(Buffer.concat([h2header, Buffer.from([0x88])])); | ||
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) { | ||
process.nextTick(writeRequests); | ||
break; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test assumed a synchronous error to break the loop. Which is no longer the case. |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
'use strict'; | ||
const common = require('../common'); | ||
const { Writable } = require('stream'); | ||
const assert = require('assert'); | ||
|
||
// Ensure callback is always invoked before | ||
// error is emitted. Regardless if error was | ||
// sync or async. | ||
|
||
{ | ||
let callbackCalled = false; | ||
// Sync Error | ||
const writable = new Writable({ | ||
write: common.mustCall((buf, enc, cb) => { | ||
cb(new Error()); | ||
}) | ||
}); | ||
writable.on('error', common.mustCall(() => { | ||
assert.strictEqual(callbackCalled, true); | ||
})); | ||
writable.write('hi', common.mustCall(() => { | ||
callbackCalled = true; | ||
})); | ||
} | ||
|
||
{ | ||
let callbackCalled = false; | ||
// Async Error | ||
const writable = new Writable({ | ||
write: common.mustCall((buf, enc, cb) => { | ||
process.nextTick(cb, new Error()); | ||
}) | ||
}); | ||
writable.on('error', common.mustCall(() => { | ||
assert.strictEqual(callbackCalled, true); | ||
})); | ||
writable.write('hi', common.mustCall(() => { | ||
callbackCalled = true; | ||
})); | ||
} | ||
|
||
{ | ||
// Sync Error | ||
const writable = new Writable({ | ||
write: common.mustCall((buf, enc, cb) => { | ||
cb(new Error()); | ||
}) | ||
}); | ||
|
||
writable.on('error', common.mustCall()); | ||
|
||
let cnt = 0; | ||
// Ensure we don't live lock on sync error | ||
while (writable.write('a')) | ||
cnt++; | ||
|
||
assert.strictEqual(cnt, 0); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,4 +16,8 @@ const socket = new JSStreamWrap(new Duplex({ | |
}) | ||
})); | ||
|
||
assert.throws(() => socket.end('foo'), /Error: write EPROTO/); | ||
socket.end('foo'); | ||
socket.on('error', common.expectsError({ | ||
type: Error, | ||
message: 'write EPROTO' | ||
})); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test assumed a synchronous error which is no longer the case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See updated tests where "incorrectly" a synchronous error was assumed to break such loops.