diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e212881c4ac555..6c89f9f662e875 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -296,9 +296,13 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (typeof cb !== 'function') cb = nop; - if (state.ending) + if (state.ending) { writeAfterEnd(this, cb); - else if (isBuf || validChunk(this, state, chunk, cb)) { + } else if (state.destroyed) { + const err = new ERR_STREAM_DESTROYED('write'); + process.nextTick(cb, err); + errorOrDestroy(this, err); + } else if (isBuf || validChunk(this, state, chunk, cb)) { state.pendingcb++; ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); } @@ -736,7 +740,21 @@ Object.defineProperty(Writable.prototype, 'writableFinished', { } }); -Writable.prototype.destroy = destroyImpl.destroy; +const destroy = destroyImpl.destroy; +Writable.prototype.destroy = function(err, cb) { + const state = this._writableState; + if (!state.destroyed) { + for (let entry = state.bufferedRequest; entry; entry = entry.next) { + process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write')); + } + state.bufferedRequest = null; + state.lastBufferedRequest = null; + state.bufferedRequestCount = 0; + } + destroy.call(this, err, cb); + return this; +}; + Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { cb(err); diff --git a/test/parallel/test-http2-server-stream-session-destroy.js b/test/parallel/test-http2-server-stream-session-destroy.js index 6a262e2736e4bc..daa618d5109107 100644 --- a/test/parallel/test-http2-server-stream-session-destroy.js +++ b/test/parallel/test-http2-server-stream-session-destroy.js @@ -39,7 +39,11 @@ server.on('stream', common.mustCall((stream) => { code: 'ERR_STREAM_WRITE_AFTER_END', message: 'write after end' })); - assert.strictEqual(stream.write('data'), false); + assert.strictEqual(stream.write('data', common.expectsError({ + type: Error, + code: 'ERR_STREAM_WRITE_AFTER_END', + message: 'write after end' + })), false); })); server.listen(0, common.mustCall(() => { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index a431d6d48d1c8e..ac107ecbb7034b 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -232,3 +232,49 @@ const assert = require('assert'); write._undestroy(); write.end(); } + +{ + const write = new Writable(); + + write.destroy(); + write.on('error', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + write.write('asd', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write.on('error', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + + write.cork(); + write.write('asd', common.mustCall()); + write.uncork(); + + write.cork(); + write.write('asd', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + write.destroy(); + write.write('asd', common.expectsError({ + type: Error, + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed' + })); + write.uncork(); +} diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js index 83b329a6a8a7b3..297217eb4accc6 100644 --- a/test/parallel/test-stream-write-destroy.js +++ b/test/parallel/test-stream-write-destroy.js @@ -24,7 +24,16 @@ for (const withPendingData of [ false, true ]) { w.on('drain', () => drains++); w.on('finish', () => finished = true); - w.write('abc', () => chunksWritten++); + function onWrite(err) { + if (err) { + assert.strictEqual(w.destroyed, true); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + } else { + chunksWritten++; + } + } + + w.write('abc', onWrite); assert.strictEqual(chunksWritten, 0); assert.strictEqual(drains, 0); callbacks.shift()(); @@ -34,14 +43,14 @@ for (const withPendingData of [ false, true ]) { if (withPendingData) { // Test 2 cases: There either is or is not data still in the write queue. // (The second write will never actually get executed either way.) - w.write('def', () => chunksWritten++); + w.write('def', onWrite); } if (useEnd) { // Again, test 2 cases: Either we indicate that we want to end the // writable or not. - w.end('ghi', () => chunksWritten++); + w.end('ghi', onWrite); } else { - w.write('ghi', () => chunksWritten++); + w.write('ghi', onWrite); } assert.strictEqual(chunksWritten, 1);