From 9e225a4668e6fb8b106772d855fdc75894381e86 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 6 Aug 2019 22:16:05 +0200 Subject: [PATCH 1/4] stream: don't flush destroyed writable --- lib/_stream_writable.js | 33 ++++++++++++- ...est-http2-server-stream-session-destroy.js | 6 ++- test/parallel/test-stream-writable-destroy.js | 46 +++++++++++++++++++ test/parallel/test-stream-write-destroy.js | 17 +++++-- 4 files changed, 96 insertions(+), 6 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e212881c4ac555..ed1311fcc49ef7 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -288,6 +288,24 @@ Writable.prototype.write = function(chunk, encoding, cb) { encoding = null; } + if (this._writableState.ending) { + const err = new ERR_STREAM_WRITE_AFTER_END(); + if (typeof cb === 'function') { + process.nextTick(cb, err); + } + errorOrDestroy(this, err); + return false; + } + + if (this.destroyed) { + const err = new ERR_STREAM_DESTROYED('write'); + if (typeof cb === 'function') { + process.nextTick(cb, err); + } + errorOrDestroy(this, err); + return false; + } + if (isBuf) encoding = 'buffer'; else if (!encoding) @@ -736,7 +754,20 @@ Object.defineProperty(Writable.prototype, 'writableFinished', { } }); -Writable.prototype.destroy = destroyImpl.destroy; +const destroy = destroyImpl.destroy; +Writable.prototype.destroy = function(err, cb) { + const state = this._writableState; + if (!state.destroyed) { + for (let entry = state.bufferedRequest; entry; entry = entry.next) { + entry.callback(new ERR_STREAM_DESTROYED('write')); + } + state.bufferedRequest = null; + state.lastBufferedRequest = null; + state.bufferedRequestCount = 0; + } + destroy.call(this, err, cb); +} + Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { cb(err); diff --git a/test/parallel/test-http2-server-stream-session-destroy.js b/test/parallel/test-http2-server-stream-session-destroy.js index 6a262e2736e4bc..daa618d5109107 100644 --- a/test/parallel/test-http2-server-stream-session-destroy.js +++ b/test/parallel/test-http2-server-stream-session-destroy.js @@ -39,7 +39,11 @@ server.on('stream', common.mustCall((stream) => { code: 'ERR_STREAM_WRITE_AFTER_END', message: 'write after end' })); - assert.strictEqual(stream.write('data'), false); + assert.strictEqual(stream.write('data', common.expectsError({ + type: Error, + code: 'ERR_STREAM_WRITE_AFTER_END', + message: 'write after end' + })), false); })); server.listen(0, common.mustCall(() => { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index a431d6d48d1c8e..ac107ecbb7034b 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -232,3 +232,49 @@ const assert = require('assert'); write._undestroy(); write.end(); } + +{ + const write = new Writable(); + + write.destroy(); + write.on('error', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + write.write('asd', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write.on('error', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + + write.cork(); + write.write('asd', common.mustCall()); + write.uncork(); + + write.cork(); + write.write('asd', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + write.destroy(); + write.write('asd', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + write.uncork(); +} diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js index 83b329a6a8a7b3..297217eb4accc6 100644 --- a/test/parallel/test-stream-write-destroy.js +++ b/test/parallel/test-stream-write-destroy.js @@ -24,7 +24,16 @@ for (const withPendingData of [ false, true ]) { w.on('drain', () => drains++); w.on('finish', () => finished = true); - w.write('abc', () => chunksWritten++); + function onWrite(err) { + if (err) { + assert.strictEqual(w.destroyed, true); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + } else { + chunksWritten++; + } + } + + w.write('abc', onWrite); assert.strictEqual(chunksWritten, 0); assert.strictEqual(drains, 0); callbacks.shift()(); @@ -34,14 +43,14 @@ for (const withPendingData of [ false, true ]) { if (withPendingData) { // Test 2 cases: There either is or is not data still in the write queue. // (The second write will never actually get executed either way.) - w.write('def', () => chunksWritten++); + w.write('def', onWrite); } if (useEnd) { // Again, test 2 cases: Either we indicate that we want to end the // writable or not. - w.end('ghi', () => chunksWritten++); + w.end('ghi', onWrite); } else { - w.write('ghi', () => chunksWritten++); + w.write('ghi', onWrite); } assert.strictEqual(chunksWritten, 1); From a53ce3c0e072d54a6c3cd4a5b7b5440ab23e66ed Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 19 Aug 2019 21:01:33 +0200 Subject: [PATCH 2/4] fixup --- lib/_stream_writable.js | 47 ++++++++++++++++------------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index ed1311fcc49ef7..c883911b5d0b0e 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -275,35 +275,28 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; - var ret = false; - const isBuf = !state.objectMode && Stream._isUint8Array(chunk); - - // Do not use Object.getPrototypeOf as it is slower since V8 7.3. - if (isBuf && !(chunk instanceof Buffer)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - } if (typeof encoding === 'function') { cb = encoding; encoding = null; } - if (this._writableState.ending) { - const err = new ERR_STREAM_WRITE_AFTER_END(); - if (typeof cb === 'function') { - process.nextTick(cb, err); - } + if (typeof cb !== 'function') + cb = nop; + + if (state.ending || state.destroyed) { + const err = state.ending ? new ERR_STREAM_WRITE_AFTER_END() : + new ERR_STREAM_DESTROYED('write'); + process.nextTick(cb, err); errorOrDestroy(this, err); return false; } - if (this.destroyed) { - const err = new ERR_STREAM_DESTROYED('write'); - if (typeof cb === 'function') { - process.nextTick(cb, err); - } - errorOrDestroy(this, err); - return false; + const isBuf = !state.objectMode && Stream._isUint8Array(chunk); + + // Do not use Object.getPrototypeOf as it is slower since V8 7.3. + if (isBuf && !(chunk instanceof Buffer)) { + chunk = Stream._uint8ArrayToBuffer(chunk); } if (isBuf) @@ -311,17 +304,13 @@ Writable.prototype.write = function(chunk, encoding, cb) { else if (!encoding) encoding = state.defaultEncoding; - if (typeof cb !== 'function') - cb = nop; - - if (state.ending) + if (state.ending) { writeAfterEnd(this, cb); - else if (isBuf || validChunk(this, state, chunk, cb)) { + return false; + } else if (isBuf || validChunk(this, state, chunk, cb)) { state.pendingcb++; - ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); + return writeOrBuffer(this, state, isBuf, chunk, encoding, cb); } - - return ret; }; Writable.prototype.cork = function() { @@ -759,14 +748,14 @@ Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { for (let entry = state.bufferedRequest; entry; entry = entry.next) { - entry.callback(new ERR_STREAM_DESTROYED('write')); + process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write')); } state.bufferedRequest = null; state.lastBufferedRequest = null; state.bufferedRequestCount = 0; } destroy.call(this, err, cb); -} +}; Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { From c4ce1256fbaaf656b77b9653ae7187813ecf360b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 23 Aug 2019 19:59:43 +0200 Subject: [PATCH 3/4] fixup: destroy needs to return self --- lib/_stream_writable.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c883911b5d0b0e..1c089a2bc0698b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -755,6 +755,7 @@ Writable.prototype.destroy = function(err, cb) { state.bufferedRequestCount = 0; } destroy.call(this, err, cb); + return this; }; Writable.prototype._undestroy = destroyImpl.undestroy; From 74380a815662a57936d03cb50ba662e3d1421652 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 26 Aug 2019 11:46:25 +0200 Subject: [PATCH 4/4] fixup --- lib/_stream_writable.js | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 1c089a2bc0698b..6c89f9f662e875 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -275,23 +275,7 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; - - if (typeof encoding === 'function') { - cb = encoding; - encoding = null; - } - - if (typeof cb !== 'function') - cb = nop; - - if (state.ending || state.destroyed) { - const err = state.ending ? new ERR_STREAM_WRITE_AFTER_END() : - new ERR_STREAM_DESTROYED('write'); - process.nextTick(cb, err); - errorOrDestroy(this, err); - return false; - } - + var ret = false; const isBuf = !state.objectMode && Stream._isUint8Array(chunk); // Do not use Object.getPrototypeOf as it is slower since V8 7.3. @@ -299,18 +283,31 @@ Writable.prototype.write = function(chunk, encoding, cb) { chunk = Stream._uint8ArrayToBuffer(chunk); } + if (typeof encoding === 'function') { + cb = encoding; + encoding = null; + } + if (isBuf) encoding = 'buffer'; else if (!encoding) encoding = state.defaultEncoding; + if (typeof cb !== 'function') + cb = nop; + if (state.ending) { writeAfterEnd(this, cb); - return false; + } else if (state.destroyed) { + const err = new ERR_STREAM_DESTROYED('write'); + process.nextTick(cb, err); + errorOrDestroy(this, err); } else if (isBuf || validChunk(this, state, chunk, cb)) { state.pendingcb++; - return writeOrBuffer(this, state, isBuf, chunk, encoding, cb); + ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); } + + return ret; }; Writable.prototype.cork = function() {