diff --git a/doc/api/stream.md b/doc/api/stream.md index db2f5ccc255326..a17a0c455d7bf8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -499,6 +499,15 @@ write('hello', () => { A Writable stream in object mode will always ignore the `encoding` argument. +##### writable.destroy([error]) + + +Destroy the stream, and emit the passed error. After this call, the +writible stream has ended. Implementors should not override this method, +but instead implement [`writable._destroy`][writable-_destroy]. + ### Readable Streams Readable streams are an abstraction for a *source* from which data is @@ -1070,6 +1079,16 @@ myReader.on('readable', () => { }); ``` +##### readable.destroy([error]) + + +Destroy the stream, and emit `'error'`. After this call, the +readable stream will release any internal resources. +Implementors should not override this method, but instead implement +[`readable._destroy`][readable-_destroy]. + ### Duplex and Transform Streams #### Class: stream.Duplex @@ -1109,6 +1128,16 @@ Examples of Transform streams include: * [zlib streams][zlib] * [crypto streams][crypto] +##### transform.destroy([error]) + + +Destroy the stream, and emit `'error'`. After this call, the +transform stream would release any internal resources. +implementors should not override this method, but instead implement +[`readable._destroy`][readable-_destroy]. +The default implementation of `_destroy` for `Transform` also emit `'close'`. ## API for Stream Implementers @@ -1247,6 +1276,8 @@ constructor and implement the `writable._write()` method. The [`stream._write()`][stream-_write] method. * `writev` {Function} Implementation for the [`stream._writev()`][stream-_writev] method. + * `destroy` {Function} Implementation for the + [`stream._destroy()`][writable-_destroy] method. For example: @@ -1356,6 +1387,15 @@ The `writable._writev()` method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs. +#### writable.\_destroy(err, callback) + + +* `err` {Error} An error. +* `callback` {Function} A callback function that takes an optional error argument + which is invoked when the writable is destroyed. + #### Errors While Writing It is recommended that errors occurring during the processing of the @@ -1425,6 +1465,8 @@ constructor and implement the `readable._read()` method. a single value instead of a Buffer of size n. Defaults to `false` * `read` {Function} Implementation for the [`stream._read()`][stream-_read] method. + * `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy] + method. For example: @@ -2073,4 +2115,8 @@ readable buffer so there is nothing for a user to consume. [stream-read]: #stream_readable_read_size [stream-resume]: #stream_readable_resume [stream-write]: #stream_writable_write_chunk_encoding_callback -[zlib]: zlib.html +[readable-_destroy]: #stream_readable_destroy_err_callback +[writable-_destroy]: #stream_writable_destroy_err_callback +[TCP sockets]: net.html#net_class_net_socket +[Transform]: #stream_class_stream_transform +[Writable]: #stream_class_stream_writable diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 4422b62aac3250..7440cd08729e1c 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -76,3 +76,33 @@ function onend() { function onEndNT(self) { self.end(); } + +Object.defineProperty(Duplex.prototype, 'destroyed', { + get() { + if (this._readableState === undefined || + this._writableState === undefined) { + return false; + } + return this._readableState.destroyed && this._writableState.destroyed; + }, + set(value) { + // we ignore the value if the stream + // has not been initialized yet + if (this._readableState === undefined || + this._writableState === undefined) { + return; + } + + // backward compatibility, the user is explicitly + // managing destroyed + this._readableState.destroyed = value; + this._writableState.destroyed = value; + } +}); + +Duplex.prototype._destroy = function(err, cb) { + this.push(null); + this.end(); + + process.nextTick(cb, err); +}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 702d87b549b857..8b0d45cc86ccf5 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -30,6 +30,7 @@ const Buffer = require('buffer').Buffer; const util = require('util'); const debug = util.debuglog('stream'); const BufferList = require('internal/streams/BufferList'); +const destroyImpl = require('internal/streams/destroy'); var StringDecoder; util.inherits(Readable, Stream); @@ -99,6 +100,9 @@ function ReadableState(options, stream) { this.readableListening = false; this.resumeScheduled = false; + // has it been destroyed + this.destroyed = false; + // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. @@ -129,12 +133,44 @@ function Readable(options) { // legacy this.readable = true; - if (options && typeof options.read === 'function') - this._read = options.read; + if (options) { + if (typeof options.read === 'function') + this._read = options.read; + + if (typeof options.destroy === 'function') + this._destroy = options.destroy; + } Stream.call(this); } +Object.defineProperty(Readable.prototype, 'destroyed', { + get() { + if (this._readableState === undefined) { + return false; + } + return this._readableState.destroyed; + }, + set(value) { + // we ignore the value if the stream + // has not been initialized yet + if (!this._readableState) { + return; + } + + // backward compatibility, the user is explicitly + // managing destroyed + this._readableState.destroyed = value; + } +}); + +Readable.prototype.destroy = destroyImpl.destroy; +Readable.prototype._undestroy = destroyImpl.undestroy; +Readable.prototype._destroy = function(err, cb) { + this.push(null); + cb(err); +}; + // Manually shove something into the read() buffer. // This returns true if the highWaterMark has not been hit yet, // similar to how Writable.write() returns true if you should diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 8adf3ed12d9384..63f65f34ce41f9 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -194,6 +194,14 @@ Transform.prototype._read = function(n) { }; +Transform.prototype._destroy = function(err, cb) { + Duplex.prototype._destroy.call(this, err, (err2) => { + cb(err2); + this.emit('close'); + }); +}; + + function done(stream, er, data) { if (er) return stream.emit('error', er); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c3c2ce0710ad18..4e2a19f12c822f 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -32,6 +32,7 @@ const util = require('util'); const internalUtil = require('internal/util'); const Stream = require('stream'); const Buffer = require('buffer').Buffer; +const destroyImpl = require('internal/streams/destroy'); util.inherits(Writable, Stream); @@ -66,6 +67,9 @@ function WritableState(options, stream) { // when 'finish' is emitted this.finished = false; + // has it been destroyed + this.destroyed = false; + // should we decode strings into buffers before passing to _write? // this is here so that some node-core streams can optimize string // handling at a lower level. @@ -192,6 +196,9 @@ function Writable(options) { if (typeof options.writev === 'function') this._writev = options.writev; + + if (typeof options.destroy === 'function') + this._destroy = options.destroy; } Stream.call(this); @@ -563,3 +570,30 @@ function onCorkedFinish(corkReq, state, err) { state.corkedRequestsFree = corkReq; } } + +Object.defineProperty(Writable.prototype, 'destroyed', { + get() { + if (this._writableState === undefined) { + return false; + } + return this._writableState.destroyed; + }, + set(value) { + // we ignore the value if the stream + // has not been initialized yet + if (!this._writableState) { + return; + } + + // backward compatibility, the user is explicitly + // managing destroyed + this._writableState.destroyed = value; + } +}); + +Writable.prototype.destroy = destroyImpl.destroy; +Writable.prototype._undestroy = destroyImpl.undestroy; +Writable.prototype._destroy = function(err, cb) { + this.end(); + cb(err); +}; diff --git a/lib/fs.js b/lib/fs.js index 0f24ee2bb8af3d..de5d6dcfb0fcbf 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1986,11 +1986,10 @@ ReadStream.prototype._read = function(n) { }; -ReadStream.prototype.destroy = function() { - if (this.destroyed) - return; - this.destroyed = true; - this.close(); +ReadStream.prototype._destroy = function(err, cb) { + this.close(function(err2) { + cb(err || err2); + }); }; @@ -2157,7 +2156,7 @@ WriteStream.prototype._writev = function(data, cb) { }; -WriteStream.prototype.destroy = ReadStream.prototype.destroy; +WriteStream.prototype._destroy = ReadStream.prototype._destroy; WriteStream.prototype.close = ReadStream.prototype.close; // There is no shutdown() for files. diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js index adfe1e8e0b049f..db544b15337142 100644 --- a/lib/internal/process/stdio.js +++ b/lib/internal/process/stdio.js @@ -18,10 +18,12 @@ function setupStdio() { function getStdout() { if (stdout) return stdout; stdout = createWritableStdioStream(1); - stdout.destroy = stdout.destroySoon = function(er) { + stdout.destroySoon = stdout.destroy; + stdout._destroy = function(er, cb) { + // avoid errors if we already emitted const errors = lazyErrors(); er = er || new errors.Error('ERR_STDOUT_CLOSE'); - stdout.emit('error', er); + cb(er); }; if (stdout.isTTY) { process.on('SIGWINCH', () => stdout._refreshSize()); @@ -32,10 +34,12 @@ function setupStdio() { function getStderr() { if (stderr) return stderr; stderr = createWritableStdioStream(2); - stderr.destroy = stderr.destroySoon = function(er) { + stderr.destroySoon = stderr.destroy; + stderr._destroy = function(er, cb) { + // avoid errors if we already emitted const errors = lazyErrors(); er = er || new errors.Error('ERR_STDERR_CLOSE'); - stderr.emit('error', er); + cb(er); }; if (stderr.isTTY) { process.on('SIGWINCH', () => stderr._refreshSize()); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js new file mode 100644 index 00000000000000..18a2a839843bb9 --- /dev/null +++ b/lib/internal/streams/destroy.js @@ -0,0 +1,65 @@ +'use strict'; + +// undocumented cb() API, needed for core, not for public API +function destroy(err, cb) { + const readableDestroyed = this._readableState && + this._readableState.destroyed; + const writableDestroyed = this._writableState && + this._writableState.destroyed; + + if (readableDestroyed || writableDestroyed) { + if (err && (!this._writableState || !this._writableState.errorEmitted)) { + process.nextTick(emitErrorNT, this, err); + } + return; + } + + // we set destroyed to true before firing error callbacks in order + // to make it re-entrance safe in case destroy() is called within callbacks + + if (this._readableState) { + this._readableState.destroyed = true; + } + + // if this is a duplex stream mark the writable part as destroyed as well + if (this._writableState) { + this._writableState.destroyed = true; + } + + this._destroy(err || null, (err) => { + if (!cb && err) { + process.nextTick(emitErrorNT, this, err); + if (this._writableState) { + this._writableState.errorEmitted = true; + } + } else if (cb) { + cb(err); + } + }); +} + +function undestroy() { + if (this._readableState) { + this._readableState.destroyed = false; + this._readableState.reading = false; + this._readableState.ended = false; + this._readableState.endEmitted = false; + } + + if (this._writableState) { + this._writableState.destroyed = false; + this._writableState.ended = false; + this._writableState.ending = false; + this._writableState.finished = false; + this._writableState.errorEmitted = false; + } +} + +function emitErrorNT(self, err) { + self.emit('error', err); +} + +module.exports = { + destroy, + undestroy +}; diff --git a/lib/net.js b/lib/net.js index 0c2c2dceda977b..a3d778a44886ca 100644 --- a/lib/net.js +++ b/lib/net.js @@ -156,7 +156,7 @@ function normalizeArgs(args) { // called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { - self.destroyed = false; + self._undestroy(); self._bytesDispatched = 0; self._sockname = null; @@ -295,7 +295,7 @@ function onSocketFinish() { var err = this._handle.shutdown(req); if (err) - return this._destroy(errnoException(err, 'shutdown')); + return this.destroy(errnoException(err, 'shutdown')); } @@ -481,7 +481,7 @@ Socket.prototype._read = function(n) { this._handle.reading = true; var err = this._handle.readStart(); if (err) - this._destroy(errnoException(err, 'read')); + this.destroy(errnoException(err, 'read')); } }; @@ -526,20 +526,6 @@ Socket.prototype.destroySoon = function() { Socket.prototype._destroy = function(exception, cb) { debug('destroy'); - function fireErrorCallbacks(self, exception, cb) { - if (cb) cb(exception); - if (exception && !self._writableState.errorEmitted) { - process.nextTick(emitErrorNT, self, exception); - self._writableState.errorEmitted = true; - } - } - - if (this.destroyed) { - debug('already destroyed, fire error callbacks'); - fireErrorCallbacks(this, exception, cb); - return; - } - this.connecting = false; this.readable = this.writable = false; @@ -564,11 +550,7 @@ Socket.prototype._destroy = function(exception, cb) { this._sockname = null; } - // we set destroyed to true before firing error callbacks in order - // to make it re-entrance safe in case Socket.prototype.destroy() - // is called within callbacks - this.destroyed = true; - fireErrorCallbacks(this, exception, cb); + cb(exception); if (this._server) { COUNTER_NET_SERVER_CONNECTION_CLOSE(this); @@ -581,12 +563,6 @@ Socket.prototype._destroy = function(exception, cb) { }; -Socket.prototype.destroy = function(exception) { - debug('destroy', exception); - this._destroy(exception); -}; - - // This function is called whenever the handle gets a // buffer, or when there's an error reading. function onread(nread, buffer) { @@ -614,7 +590,7 @@ function onread(nread, buffer) { debug('readStop'); var err = handle.readStop(); if (err) - self._destroy(errnoException(err, 'read')); + self.destroy(errnoException(err, 'read')); } return; } @@ -628,7 +604,7 @@ function onread(nread, buffer) { // Error, possibly EOF. if (nread !== uv.UV_EOF) { - return self._destroy(errnoException(nread, 'read')); + return self.destroy(errnoException(nread, 'read')); } debug('EOF'); @@ -739,7 +715,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { this._unrefTimer(); if (!this._handle) { - this._destroy(new Error('This socket is closed'), cb); + this.destroy(new Error('This socket is closed'), cb); return false; } @@ -771,7 +747,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { } if (err) - return this._destroy(errnoException(err, 'write', req.error), cb); + return this.destroy(errnoException(err, 'write', req.error), cb); this._bytesDispatched += req.bytes; @@ -862,7 +838,7 @@ function afterWrite(status, handle, req, err) { if (status < 0) { var ex = errnoException(status, 'write', req.error); debug('write failure', ex); - self._destroy(ex, req.cb); + self.destroy(ex, req.cb); return; } @@ -896,13 +872,13 @@ function internalConnect( localAddress = localAddress || '::'; err = self._handle.bind6(localAddress, localPort); } else { - self._destroy(new TypeError('Invalid addressType: ' + addressType)); + self.destroy(new TypeError('Invalid addressType: ' + addressType)); return; } if (err) { const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort); - self._destroy(ex); + self.destroy(ex); return; } } @@ -944,7 +920,7 @@ function internalConnect( } const ex = exceptionWithHostPort(err, 'connect', address, port, details); - self._destroy(ex); + self.destroy(ex); } } @@ -971,14 +947,7 @@ Socket.prototype.connect = function() { this.write = Socket.prototype.write; if (this.destroyed) { - this._readableState.reading = false; - this._readableState.ended = false; - this._readableState.endEmitted = false; - this._writableState.ended = false; - this._writableState.ending = false; - this._writableState.finished = false; - this._writableState.errorEmitted = false; - this.destroyed = false; + this._undestroy(); this._handle = null; this._peername = null; this._sockname = null; @@ -1088,8 +1057,7 @@ function lookupAndConnect(self, options) { function connectErrorNT(self, err) { - self.emit('error', err); - self._destroy(); + self.destroy(err); } @@ -1162,7 +1130,7 @@ function afterConnect(status, handle, req, readable, writable) { ex.localAddress = req.localAddress; ex.localPort = req.localPort; } - self._destroy(ex); + self.destroy(ex); } } diff --git a/node.gyp b/node.gyp index f50aea3475598d..bca0b532dc2ca6 100644 --- a/node.gyp +++ b/node.gyp @@ -105,6 +105,7 @@ 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/BufferList.js', 'lib/internal/streams/legacy.js', + 'lib/internal/streams/destroy.js', 'deps/v8/tools/splaytree.js', 'deps/v8/tools/codemap.js', 'deps/v8/tools/consarray.js', diff --git a/test/parallel/test-process-external-stdio-close-spawn.js b/test/parallel/test-process-external-stdio-close-spawn.js new file mode 100644 index 00000000000000..b1ab52ff8a9d9c --- /dev/null +++ b/test/parallel/test-process-external-stdio-close-spawn.js @@ -0,0 +1,31 @@ +'use strict'; +// Refs: https://github.com/nodejs/node/issues/947 +const common = require('../common'); +const assert = require('assert'); +const cp = require('child_process'); + +if (process.argv[2] === 'child') { + process.on('message', common.mustCall((msg) => { + assert.strictEqual(msg, 'go'); + // the following console.log is an integral part + // of the test. If this regress, this call will + // cause the process to exit with 1 + console.log('logging should not cause a crash'); + process.disconnect(); + })); +} else { + // Passing '--inspect', '--inspect-brk' to child.spawn enables + // the debugger. This test was added to help debug the fork-based + // test with the same name. + const child = cp.spawn(process.execPath, [__filename, 'child'], { + stdio: ['pipe', 'pipe', 'pipe', 'ipc'] + }); + + child.on('close', common.mustCall((exitCode, signal) => { + assert.strictEqual(exitCode, 0, 'exit successfully'); + assert.strictEqual(signal, null); + })); + + child.stdout.destroy(); + child.send('go'); +} diff --git a/test/parallel/test-process-external-stdio-close.js b/test/parallel/test-process-external-stdio-close.js index 79e3641bdf06fc..798ee00d0511b0 100644 --- a/test/parallel/test-process-external-stdio-close.js +++ b/test/parallel/test-process-external-stdio-close.js @@ -7,6 +7,9 @@ const cp = require('child_process'); if (process.argv[2] === 'child') { process.on('message', common.mustCall((msg) => { assert.strictEqual(msg, 'go'); + // the following console.log is an integral part + // of the test. If this regress, this call will + // cause the process to exit with 1 console.log('logging should not cause a crash'); process.disconnect(); })); diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js new file mode 100644 index 00000000000000..00e334d64b5693 --- /dev/null +++ b/test/parallel/test-stream-duplex-destroy.js @@ -0,0 +1,194 @@ +'use strict'; + +const common = require('../common'); +const { Duplex } = require('stream'); +const assert = require('assert'); +const { inherits } = require('util'); + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {} + }); + + duplex.resume(); + + duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustCall()); + + duplex.destroy(); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {} + }); + duplex.resume(); + + const expected = new Error('kaboom'); + + duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustCall()); + duplex.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + duplex.destroy(expected); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {} + }); + + duplex._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(err); + }); + + const expected = new Error('kaboom'); + + duplex.on('finish', common.mustNotCall('no finish event')); + duplex.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + duplex.destroy(expected); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const expected = new Error('kaboom'); + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {}, + destroy: common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(); + }) + }); + duplex.resume(); + + duplex.on('end', common.mustNotCall('no end event')); + duplex.on('finish', common.mustNotCall('no finish event')); + + // error is swallowed by the custom _destroy + duplex.on('error', common.mustNotCall('no error event')); + + duplex.destroy(expected); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {} + }); + + duplex._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(); + }); + + duplex.destroy(); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {} + }); + duplex.resume(); + + duplex._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + process.nextTick(() => { + this.push(null); + this.end(); + cb(); + }); + }); + + const fail = common.mustNotCall('no finish or end event'); + + duplex.on('finish', fail); + duplex.on('end', fail); + + duplex.destroy(); + + duplex.removeListener('end', fail); + duplex.removeListener('finish', fail); + duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustCall()); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {} + }); + + const expected = new Error('kaboom'); + + duplex._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(expected); + }); + + duplex.on('finish', common.mustNotCall('no finish event')); + duplex.on('end', common.mustNotCall('no end event')); + duplex.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + duplex.destroy(); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {}, + allowHalfOpen: true + }); + duplex.resume(); + + duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustCall()); + + duplex.destroy(); + assert.strictEqual(duplex.destroyed, true); +} + +{ + const duplex = new Duplex({ + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + + duplex.destroyed = true; + assert.strictEqual(duplex.destroyed, true); + + // the internal destroy() mechanism should not be triggered + duplex.on('finish', common.mustNotCall()); + duplex.on('end', common.mustNotCall()); + duplex.destroy(); +} + +{ + function MyDuplex() { + assert.strictEqual(this.destroyed, false); + this.destroyed = false; + Duplex.call(this); + } + + inherits(MyDuplex, Duplex); + + new MyDuplex(); +} diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js new file mode 100644 index 00000000000000..800b6be0865377 --- /dev/null +++ b/test/parallel/test-stream-readable-destroy.js @@ -0,0 +1,162 @@ +'use strict'; + +const common = require('../common'); +const { Readable } = require('stream'); +const assert = require('assert'); +const { inherits } = require('util'); + +{ + const read = new Readable({ + read() {} + }); + read.resume(); + + read.on('end', common.mustCall()); + + read.destroy(); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {} + }); + read.resume(); + + const expected = new Error('kaboom'); + + read.on('end', common.mustCall()); + read.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + read.destroy(expected); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {} + }); + + read._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(err); + }); + + const expected = new Error('kaboom'); + + read.on('end', common.mustNotCall('no end event')); + read.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + read.destroy(expected); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {}, + destroy: common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(); + }) + }); + + const expected = new Error('kaboom'); + + read.on('end', common.mustNotCall('no end event')); + + // error is swallowed by the custom _destroy + read.on('error', common.mustNotCall('no error event')); + + read.destroy(expected); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {} + }); + + read._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(); + }); + + read.destroy(); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {} + }); + read.resume(); + + read._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + process.nextTick(() => { + this.push(null); + cb(); + }); + }); + + const fail = common.mustNotCall('no end event'); + + read.on('end', fail); + + read.destroy(); + + read.removeListener('end', fail); + read.on('end', common.mustCall()); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {} + }); + + const expected = new Error('kaboom'); + + read._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(expected); + }); + + read.on('end', common.mustNotCall('no end event')); + read.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + read.destroy(); + assert.strictEqual(read.destroyed, true); +} + +{ + const read = new Readable({ + read() {} + }); + read.resume(); + + read.destroyed = true; + assert.strictEqual(read.destroyed, true); + + // the internal destroy() mechanism should not be triggered + read.on('end', common.mustNotCall()); + read.destroy(); +} + +{ + function MyReadable() { + assert.strictEqual(this.destroyed, false); + this.destroyed = false; + Readable.call(this); + } + + inherits(MyReadable, Readable); + + new MyReadable(); +} diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js new file mode 100644 index 00000000000000..c42fe1d6f96d08 --- /dev/null +++ b/test/parallel/test-stream-transform-destroy.js @@ -0,0 +1,143 @@ +'use strict'; + +const common = require('../common'); +const { Transform } = require('stream'); +const assert = require('assert'); + +{ + const transform = new Transform({ + transform(chunk, enc, cb) {} + }); + + transform.resume(); + + transform.on('end', common.mustCall()); + transform.on('close', common.mustCall()); + transform.on('finish', common.mustCall()); + + transform.destroy(); +} + +{ + const transform = new Transform({ + transform(chunk, enc, cb) {} + }); + transform.resume(); + + const expected = new Error('kaboom'); + + transform.on('end', common.mustCall()); + transform.on('finish', common.mustCall()); + transform.on('close', common.mustCall()); + transform.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + transform.destroy(expected); +} + +{ + const transform = new Transform({ + transform(chunk, enc, cb) {} + }); + + transform._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(err); + }, 1); + + const expected = new Error('kaboom'); + + transform.on('finish', common.mustNotCall('no finish event')); + transform.on('close', common.mustNotCall('no close event')); + transform.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + transform.destroy(expected); +} + +{ + const expected = new Error('kaboom'); + const transform = new Transform({ + transform(chunk, enc, cb) {}, + destroy: common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(); + }, 1) + }); + transform.resume(); + + transform.on('end', common.mustNotCall('no end event')); + transform.on('close', common.mustNotCall('no close event')); + transform.on('finish', common.mustNotCall('no finish event')); + + // error is swallowed by the custom _destroy + transform.on('error', common.mustNotCall('no error event')); + + transform.destroy(expected); +} + +{ + const transform = new Transform({ + transform(chunk, enc, cb) {} + }); + + transform._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(); + }, 1); + + transform.destroy(); +} + +{ + const transform = new Transform({ + transform(chunk, enc, cb) {} + }); + transform.resume(); + + transform._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + process.nextTick(() => { + this.push(null); + this.end(); + cb(); + }); + }, 1); + + const fail = common.mustNotCall('no event'); + + transform.on('finish', fail); + transform.on('end', fail); + transform.on('close', fail); + + transform.destroy(); + + transform.removeListener('end', fail); + transform.removeListener('finish', fail); + transform.on('end', common.mustCall()); + transform.on('finish', common.mustCall()); +} + +{ + const transform = new Transform({ + transform(chunk, enc, cb) {} + }); + + const expected = new Error('kaboom'); + + transform._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(expected); + }, 1); + + transform.on('close', common.mustNotCall('no close event')); + transform.on('finish', common.mustNotCall('no finish event')); + transform.on('end', common.mustNotCall('no end event')); + transform.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + transform.destroy(); +} diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js new file mode 100644 index 00000000000000..a91f148f9e5cd5 --- /dev/null +++ b/test/parallel/test-stream-writable-destroy.js @@ -0,0 +1,172 @@ +'use strict'; + +const common = require('../common'); +const { Writable } = require('stream'); +const assert = require('assert'); +const { inherits } = require('util'); + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write.on('finish', common.mustCall()); + + write.destroy(); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + const expected = new Error('kaboom'); + + write.on('finish', common.mustCall()); + write.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + write.destroy(expected); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write._destroy = function(err, cb) { + assert.strictEqual(err, expected); + cb(err); + }; + + const expected = new Error('kaboom'); + + write.on('finish', common.mustNotCall('no finish event')); + write.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + write.destroy(expected); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); }, + destroy: common.mustCall(function(err, cb) { + assert.strictEqual(err, expected); + cb(); + }) + }); + + const expected = new Error('kaboom'); + + write.on('finish', common.mustNotCall('no finish event')); + + // error is swallowed by the custom _destroy + write.on('error', common.mustNotCall('no error event')); + + write.destroy(expected); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(); + }); + + write.destroy(); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + process.nextTick(() => { + this.end(); + cb(); + }); + }); + + const fail = common.mustNotCall('no finish event'); + + write.on('finish', fail); + + write.destroy(); + + write.removeListener('finish', fail); + write.on('finish', common.mustCall()); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + const expected = new Error('kaboom'); + + write._destroy = common.mustCall(function(err, cb) { + assert.strictEqual(err, null); + cb(expected); + }); + + write.on('finish', common.mustNotCall('no finish event')); + write.on('error', common.mustCall((err) => { + assert.strictEqual(err, expected); + })); + + write.destroy(); + assert.strictEqual(write.destroyed, true); +} + +{ + // double error case + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write.on('error', common.mustCall()); + + write.destroy(new Error('kaboom 1')); + write.destroy(new Error('kaboom 2')); + assert.strictEqual(write._writableState.errorEmitted, true); + assert.strictEqual(write.destroyed, true); +} + +{ + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + + write.destroyed = true; + assert.strictEqual(write.destroyed, true); + + // the internal destroy() mechanism should not be triggered + write.on('finish', common.mustNotCall()); + write.destroy(); +} + +{ + function MyWritable() { + assert.strictEqual(this.destroyed, false); + this.destroyed = false; + Writable.call(this); + } + + inherits(MyWritable, Writable); + + new MyWritable(); +} diff --git a/test/parallel/test-tls-writewrap-leak.js b/test/parallel/test-tls-writewrap-leak.js index b1a6d6a1058a65..0d61f279312dcf 100644 --- a/test/parallel/test-tls-writewrap-leak.js +++ b/test/parallel/test-tls-writewrap-leak.js @@ -16,7 +16,7 @@ const server = net.createServer(common.mustCall((c) => { const c = tls.connect({ port: server.address().port }); c.on('error', () => { // Otherwise `.write()` callback won't be invoked. - c.destroyed = false; + c._undestroy(); }); c.write('hello', common.mustCall((err) => { diff --git a/test/pseudo-tty/test-tty-stdout-end.js b/test/pseudo-tty/test-tty-stdout-end.js index 3c91977f902cd2..2ec7ca9114db6f 100644 --- a/test/pseudo-tty/test-tty-stdout-end.js +++ b/test/pseudo-tty/test-tty-stdout-end.js @@ -1,10 +1,10 @@ 'use strict'; const common = require('../common'); -const assert = require('assert'); -assert.throws(() => process.stdout.end(), - common.expectsError({ - code: 'ERR_STDOUT_CLOSE', - type: Error, - message: 'process.stdout cannot be closed' - })); +process.on('uncaughtException', common.expectsError({ + code: 'ERR_STDOUT_CLOSE', + type: Error, + message: 'process.stdout cannot be closed' +})); + +process.stdout.end();