Skip to content

Commit

Permalink
stream: fix multiple destroy calls
Browse files Browse the repository at this point in the history
Previously destroy could be called multiple times causing inconsistent
and hard to predict behavior. Furthermore, since the stream _destroy
implementation can only be called once, the behavior of applying destroy
multiple times becomes unclear.

This changes so that only the first destroy() call is executed and any
subsequent calls are noops.

PR-URL: #29197
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
ronag committed Feb 29, 2020
1 parent 8b1efe0 commit 311e12b
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 73 deletions.
11 changes: 11 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ This is a destructive and immediate way to destroy a stream. Previous calls to
`write()` may not have drained, and may trigger an `ERR_STREAM_DESTROYED` error.
Use `end()` instead of destroy if data should flush before close, or wait for
the `'drain'` event before destroying the stream.

Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

Implementors should not override this method,
but instead implement [`writable._destroy()`][writable-_destroy].

Expand Down Expand Up @@ -953,6 +957,10 @@ Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`
event (unless `emitClose` is set to `false`). After this call, the readable
stream will release any internal resources and subsequent calls to `push()`
will be ignored.

Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

Implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].

Expand Down Expand Up @@ -1484,6 +1492,9 @@ Implementors should not override this method, but instead implement
The default implementation of `_destroy()` for `Transform` also emit `'close'`
unless `emitClose` is set in false.

Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

### `stream.finished(stream[, options], callback)`
<!-- YAML
added: v10.0.0
Expand Down
2 changes: 1 addition & 1 deletion lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ exports.connect = function connect(...args) {
tlssock._start();

tlssock.on('secure', onConnectSecure);
tlssock.once('end', onConnectEnd);
tlssock.prependListener('end', onConnectEnd);

return tlssock;
};
2 changes: 1 addition & 1 deletion lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ WriteStream.prototype._writev = function(data, cb) {

if (er) {
if (this.autoClose) {
this.destroy();
this.destroy(er);
}
return cb(er);
}
Expand Down
33 changes: 17 additions & 16 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if ((w && w.destroyed) || (r && r.destroyed)) {
if (typeof cb === 'function') {
// TODO(ronag): Invoke with `'close'`/`'error'`.
cb();
}

return this;
}

if (err) {
if (w) {
w.errored = true;
Expand All @@ -16,16 +25,6 @@ function destroy(err, cb) {
}
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (err) {
process.nextTick(emitErrorNT, this, err);
}

return this;
}

// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks

Expand Down Expand Up @@ -53,13 +52,11 @@ function destroy(err, cb) {
r.closed = true;
}

if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
if (typeof cb === 'function') {
cb(err);
// Don't emit 'error' if passed a callback.
process.nextTick(emitCloseNT, this);
} else if (err) {
}

if (err) {
process.nextTick(emitErrorCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
Expand Down Expand Up @@ -138,6 +135,10 @@ function errorOrDestroy(stream, err, sync) {
const r = stream._readableState;
const w = stream._writableState;

if ((w && w.destroyed) || (r && r.destroyed)) {
return this;
}

if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (err) {
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-file-write-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ file

callbacks.close++;
console.error('write after end should not be allowed');
file.write('should not work anymore');
file.on('error', common.expectsError({
file.write('should not work anymore', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
Expand Down
13 changes: 4 additions & 9 deletions test/parallel/test-file-write-stream2.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const filepath = path.join(tmpdir.path, 'write.txt');

const EXPECTED = '012345678910';

const cb_expected = 'write open drain write drain close error ';
const cb_expected = 'write open drain write drain close ';
let cb_occurred = '';

let countDrains = 0;
Expand Down Expand Up @@ -92,16 +92,11 @@ file.on('drain', function() {
file.on('close', function() {
cb_occurred += 'close ';
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);
file.write('should not work anymore');
file.write('should not work anymore', (err) => {
assert.ok(err.message.includes('write after end'));
});
});


file.on('error', function(err) {
cb_occurred += 'error ';
assert.ok(err.message.includes('write after end'));
});


for (let i = 0; i < 11; i++) {
const ret = file.write(String(i));
console.error(`${i} ${ret}`);
Expand Down
8 changes: 3 additions & 5 deletions test/parallel/test-http2-server-stream-session-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ server.on('stream', common.mustCall((stream) => {
name: 'Error'
}
);
stream.on('error', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_WRITE_AFTER_END',
message: 'write after end'
}));
// When session is detroyed all streams are destroyed and no further
// error should be emitted.
stream.on('error', common.mustNotCall());
assert.strictEqual(stream.write('data', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_WRITE_AFTER_END',
Expand Down
6 changes: 1 addition & 5 deletions test/parallel/test-net-socket-destroy-send.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ server.listen(0, common.mustCall(function() {
conn.on('connect', common.mustCall(function() {
// Test destroy returns this, even on multiple calls when it short-circuits.
assert.strictEqual(conn, conn.destroy().destroy());
conn.on('error', common.expectsError({
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed',
name: 'Error'
}));
conn.on('error', common.mustNotCall());

conn.write(Buffer.from('kaboom'), common.expectsError({
code: 'ERR_STREAM_DESTROYED',
Expand Down
5 changes: 2 additions & 3 deletions test/parallel/test-stream-catch-rejections.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ const assert = require('assert');
const r = new stream.Readable({
captureRejections: true,
read() {
this.push('hello');
this.push('world');
this.push(null);
}
});
r.push('hello');
r.push('world');

const err = new Error('kaboom');

Expand Down
4 changes: 1 addition & 3 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,7 @@ const { promisify } = require('util');
res,
stream,
common.mustCall((err) => {
assert.ok(err);
// TODO(ronag):
// assert.strictEqual(err.message, 'oh no');
assert.strictEqual(err.message, 'oh no');
server.close();
})
);
Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ const assert = require('assert');

let ticked = false;
read.on('close', common.mustCall(() => {
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(ticked, true);
}));
// 'error' should not be emitted since a callback is passed to
// destroy(err, callback);
read.on('error', common.mustNotCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));

assert.strictEqual(read._readableState.errored, false);
assert.strictEqual(read._readableState.errorEmitted, false);
Expand Down Expand Up @@ -217,7 +217,7 @@ const assert = require('assert');
}));
readable.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 2');
assert.strictEqual(err.message, 'kaboom 1');
assert.strictEqual(readable._readableState.errorEmitted, true);
}));

Expand All @@ -230,7 +230,7 @@ const assert = require('assert');
// the `_destroy()` callback is called.
readable.destroy(new Error('kaboom 2'));
assert.strictEqual(readable._readableState.errorEmitted, false);
assert.strictEqual(readable._readableState.errored, true);
assert.strictEqual(readable._readableState.errored, false);

ticked = true;
}
Expand Down
22 changes: 8 additions & 14 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ const assert = require('assert');

let ticked = false;
writable.on('close', common.mustCall(() => {
writable.on('error', common.mustNotCall());
writable.destroy(new Error('hello'));
assert.strictEqual(ticked, true);
assert.strictEqual(writable._writableState.errorEmitted, true);
}));
writable.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 2');
assert.strictEqual(err.message, 'kaboom 1');
assert.strictEqual(writable._writableState.errorEmitted, true);
}));

Expand All @@ -205,7 +207,7 @@ const assert = require('assert');
// the `_destroy()` callback is called.
writable.destroy(new Error('kaboom 2'));
assert.strictEqual(writable._writableState.errorEmitted, false);
assert.strictEqual(writable._writableState.errored, true);
assert.strictEqual(writable._writableState.errored, false);

ticked = true;
}
Expand Down Expand Up @@ -246,8 +248,8 @@ const assert = require('assert');

const expected = new Error('kaboom');

write.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(err, expected);
write.destroy(expected, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
}

Expand All @@ -271,11 +273,7 @@ const assert = require('assert');
const write = new Writable();

write.destroy();
write.on('error', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed'
}));
write.on('error', common.mustNotCall());
write.write('asd', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_DESTROYED',
Expand All @@ -288,11 +286,7 @@ const assert = require('assert');
write(chunk, enc, cb) { cb(); }
});

write.on('error', common.expectsError({
name: 'Error',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed'
}));
write.on('error', common.mustNotCall());

write.cork();
write.write('asd', common.mustCall());
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-stream-writable-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const { Writable } = require('stream');
w.write('asd');
assert.strictEqual(w.writable, false);
w.on('error', common.mustCall());
w.destroy();
}

{
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-stream-writable-write-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ function test(autoDestroy) {
_write() {}
});
w.destroy();
expectError(w, 'asd', 'ERR_STREAM_DESTROYED');
}

{
Expand Down
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset-localaddress.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const server = net.createServer((c) => {
}).listen(common.mustCall(() => {
const port = server.address().port;

let errored = false;
tls.connect({
port: port,
localAddress: common.localhostIPv4
Expand All @@ -24,5 +25,9 @@ const server = net.createServer((c) => {
assert.strictEqual(e.port, port);
assert.strictEqual(e.localAddress, common.localhostIPv4);
server.close();
errored = true;
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));
}));
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ if (process.argv[2] !== 'child') {
const server = net.createServer((c) => {
c.end();
}).listen(common.PIPE, common.mustCall(() => {
let errored = false;
tls.connect({ path: common.PIPE })
.once('error', common.mustCall((e) => {
assert.strictEqual(e.code, 'ECONNRESET');
Expand All @@ -39,5 +40,9 @@ const server = net.createServer((c) => {
assert.strictEqual(e.host, undefined);
assert.strictEqual(e.localAddress, undefined);
server.close();
errored = true;
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));
}));
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ const server = net.createServer((c) => {

const socket = new net.Socket();

let errored = false;
tls.connect({ socket })
.once('error', common.mustCall((e) => {
assert.strictEqual(e.code, 'ECONNRESET');
assert.strictEqual(e.path, undefined);
assert.strictEqual(e.host, undefined);
assert.strictEqual(e.port, undefined);
assert.strictEqual(e.localAddress, undefined);
errored = true;
server.close();
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));

socket.connect(port);
Expand Down
5 changes: 5 additions & 0 deletions test/parallel/test-tls-wrap-econnreset.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const server = net.createServer((c) => {
}).listen(common.mustCall(() => {
const port = server.address().port;

let errored = false;
tls.connect(port, common.localhostIPv4)
.once('error', common.mustCall((e) => {
assert.strictEqual(e.code, 'ECONNRESET');
Expand All @@ -21,5 +22,9 @@ const server = net.createServer((c) => {
assert.strictEqual(e.port, port);
assert.strictEqual(e.localAddress, undefined);
server.close();
errored = true;
}))
.on('close', common.mustCall(() => {
assert.strictEqual(errored, true);
}));
}));
Loading

0 comments on commit 311e12b

Please sign in to comment.