From 75b30c606c9b18fdb2634e8fe5e2ca5e9a889286 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 28 Sep 2019 07:33:47 +0200 Subject: [PATCH] stream: emit 'error' asynchronously errorOrDestroy emits 'error' synchronously due to compat reasons. However, it should be possible to use correct async behaviour for new code. PR-URL: https://github.com/nodejs/node/pull/29744 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Anna Henningsen Reviewed-By: Rich Trott --- doc/api/stream.md | 3 +- lib/_stream_writable.js | 81 ++++++++----------- lib/internal/streams/destroy.js | 9 ++- .../test-child-process-server-close.js | 2 +- test/parallel/test-file-write-stream.js | 23 ++---- test/parallel/test-net-socket-write-error.js | 16 ++-- test/parallel/test-net-write-arguments.js | 18 +++-- test/parallel/test-stream-writable-destroy.js | 2 +- .../test-stream-writable-end-multiple.js | 4 +- test/parallel/test-stream-writable-null.js | 30 +++---- .../test-stream-writable-write-error.js | 59 ++++++++++++++ test/parallel/test-zlib-object-write.js | 13 +-- test/parallel/test-zlib-write-after-close.js | 19 ++--- 13 files changed, 163 insertions(+), 116 deletions(-) create mode 100644 test/parallel/test-stream-writable-write-error.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 9337077ef4a108..650a164775776f 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1853,7 +1853,8 @@ methods only. The `callback` method must be called to signal either that the write completed successfully or failed with an error. The first argument passed to the `callback` must be the `Error` object if the call failed or `null` if the -write succeeded. +write succeeded. The `callback` method will always be called asynchronously and +before `'error'` is emitted. All calls to `writable.write()` that occur between the time `writable._write()` is called and the `callback` is called will cause the written data to be diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 8470da8b816e6f..8d427f1afe1da6 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -265,33 +265,6 @@ Writable.prototype.pipe = function() { errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; - -function writeAfterEnd(stream, cb) { - const er = new ERR_STREAM_WRITE_AFTER_END(); - // TODO: defer error events consistently everywhere, not just the cb - errorOrDestroy(stream, er); - process.nextTick(cb, er); -} - -// Checks that a user-supplied chunk is valid, especially for the particular -// mode the stream is in. Currently this means that `null` is never accepted -// and undefined/non-string values are only allowed in object mode. -function validChunk(stream, state, chunk, cb) { - var er; - - if (chunk === null) { - er = new ERR_STREAM_NULL_VALUES(); - } else if (typeof chunk !== 'string' && !state.objectMode) { - er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); - } - if (er) { - errorOrDestroy(stream, er); - process.nextTick(cb, er); - return false; - } - return true; -} - Writable.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; var ret = false; @@ -315,17 +288,25 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (typeof cb !== 'function') cb = nop; + let err; if (state.ending) { - writeAfterEnd(this, cb); + err = new ERR_STREAM_WRITE_AFTER_END(); } else if (state.destroyed) { - const err = new ERR_STREAM_DESTROYED('write'); - process.nextTick(cb, err); - errorOrDestroy(this, err); - } else if (isBuf || validChunk(this, state, chunk, cb)) { + err = new ERR_STREAM_DESTROYED('write'); + } else if (chunk === null) { + err = new ERR_STREAM_NULL_VALUES(); + } else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) { + err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); + } else { state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); } + if (err) { + process.nextTick(cb, err); + errorOrDestroy(this, err, true); + } + return ret; }; @@ -629,7 +610,7 @@ Writable.prototype._write = function(chunk, encoding, cb) { if (this._writev) { this._writev([{ chunk, encoding }], cb); } else { - cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()')); + process.nextTick(cb, new ERR_METHOD_NOT_IMPLEMENTED('_write()')); } }; @@ -656,15 +637,25 @@ Writable.prototype.end = function(chunk, encoding, cb) { this.uncork(); } + if (typeof cb !== 'function') + cb = nop; + // Ignore unnecessary end() calls. - if (!state.ending) { + // TODO(ronag): Compat. Allow end() after destroy(). + if (!state.errored && !state.ending) { endWritable(this, state, cb); - } else if (typeof cb === 'function') { - if (!state.finished) { - onFinished(this, state, cb); - } else { - cb(new ERR_STREAM_ALREADY_FINISHED('end')); - } + } else if (state.finished) { + const err = new ERR_STREAM_ALREADY_FINISHED('end'); + process.nextTick(cb, err); + // TODO(ronag): Compat. Don't error the stream. + // errorOrDestroy(this, err, true); + } else if (state.destroyed) { + const err = new ERR_STREAM_DESTROYED('end'); + process.nextTick(cb, err); + // TODO(ronag): Compat. Don't error the stream. + // errorOrDestroy(this, err, true); + } else if (cb !== nop) { + onFinished(this, state, cb); } return this; @@ -749,7 +740,7 @@ function finish(stream, state) { function endWritable(stream, state, cb) { state.ending = true; finishMaybe(stream, state, true); - if (cb) { + if (cb !== nop) { if (state.finished) process.nextTick(cb); else @@ -774,14 +765,6 @@ function onCorkedFinish(corkReq, state, err) { } function onFinished(stream, state, cb) { - if (state.destroyed && state.errorEmitted) { - // TODO(ronag): Backwards compat. Should be moved to end() without - // errorEmitted check and with errorOrDestroy. - const err = new ERR_STREAM_DESTROYED('end'); - process.nextTick(cb, err); - return; - } - function onerror(err) { stream.removeListener('finish', onfinish); stream.removeListener('error', onerror); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index fb206c6c83d0a0..4df56b10f09dd8 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -119,7 +119,7 @@ function undestroy() { } } -function errorOrDestroy(stream, err) { +function errorOrDestroy(stream, err, sync) { // We have tests that rely on errors being emitted // in the same tick, so changing this is semver major. // For now when you opt-in to autoDestroy we allow @@ -138,7 +138,12 @@ function errorOrDestroy(stream, err) { if (r) { r.errored = true; } - emitErrorNT(stream, err); + + if (sync) { + process.nextTick(emitErrorNT, stream, err); + } else { + emitErrorNT(stream, err); + } } } diff --git a/test/parallel/test-child-process-server-close.js b/test/parallel/test-child-process-server-close.js index 8ad32b4bbfcbd9..832cf970178afa 100644 --- a/test/parallel/test-child-process-server-close.js +++ b/test/parallel/test-child-process-server-close.js @@ -32,7 +32,7 @@ const server = net.createServer((conn) => { })); }).listen(common.PIPE, () => { const client = net.connect(common.PIPE, common.mustCall()); - client.on('data', () => { + client.once('data', () => { client.end(() => { server.close(); }); diff --git a/test/parallel/test-file-write-stream.js b/test/parallel/test-file-write-stream.js index 05a2d5432b5fb7..1055fac698fa92 100644 --- a/test/parallel/test-file-write-stream.js +++ b/test/parallel/test-file-write-stream.js @@ -20,7 +20,7 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. 'use strict'; -require('../common'); +const common = require('../common'); const assert = require('assert'); const path = require('path'); @@ -46,9 +46,6 @@ file callbacks.open++; assert.strictEqual(typeof fd, 'number'); }) - .on('error', function(err) { - throw err; - }) .on('drain', function() { console.error('drain!', callbacks.drain); callbacks.drain++; @@ -65,17 +62,13 @@ file assert.strictEqual(file.bytesWritten, EXPECTED.length * 2); callbacks.close++; - assert.throws( - () => { - console.error('write after end should not be allowed'); - file.write('should not work anymore'); - }, - { - code: 'ERR_STREAM_WRITE_AFTER_END', - name: 'Error', - message: 'write after end' - } - ); + console.error('write after end should not be allowed'); + file.write('should not work anymore'); + file.on('error', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + name: 'Error', + message: 'write after end' + })); fs.unlinkSync(fn); }); diff --git a/test/parallel/test-net-socket-write-error.js b/test/parallel/test-net-socket-write-error.js index c324aea47a442a..ab748480ea3c52 100644 --- a/test/parallel/test-net-socket-write-error.js +++ b/test/parallel/test-net-socket-write-error.js @@ -1,18 +1,20 @@ 'use strict'; -require('../common'); -const assert = require('assert'); +const common = require('../common'); const net = require('net'); const server = net.createServer().listen(0, connectToServer); function connectToServer() { const client = net.createConnection(this.address().port, () => { - assert.throws(() => client.write(1337), - { - code: 'ERR_INVALID_ARG_TYPE', - name: 'TypeError' - }); + client.write(1337, common.expectsError({ + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError' + })); + client.on('error', common.expectsError({ + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError' + })); client.destroy(); }) diff --git a/test/parallel/test-net-write-arguments.js b/test/parallel/test-net-write-arguments.js index d3dde36b02f852..8d864e546647d3 100644 --- a/test/parallel/test-net-write-arguments.js +++ b/test/parallel/test-net-write-arguments.js @@ -1,17 +1,21 @@ 'use strict'; const common = require('../common'); -const assert = require('assert'); const net = require('net'); const socket = net.Stream({ highWaterMark: 0 }); // Make sure that anything besides a buffer or a string throws. -assert.throws(() => socket.write(null), - { - code: 'ERR_STREAM_NULL_VALUES', - name: 'TypeError', - message: 'May not write null values to stream' - }); +socket.write(null, common.expectsError({ + code: 'ERR_STREAM_NULL_VALUES', + name: 'TypeError', + message: 'May not write null values to stream' +})); +socket.on('error', common.expectsError({ + code: 'ERR_STREAM_NULL_VALUES', + name: 'TypeError', + message: 'May not write null values to stream' +})); + [ true, false, diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index c93478c01c45fa..2a9a1965adbe37 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -341,7 +341,7 @@ const assert = require('assert'); write.destroy(); let ticked = false; write.end(common.mustCall((err) => { - assert.strictEqual(ticked, false); + assert.strictEqual(ticked, true); assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); })); ticked = true; diff --git a/test/parallel/test-stream-writable-end-multiple.js b/test/parallel/test-stream-writable-end-multiple.js index a94676ab8ab366..000f5b07f594f6 100644 --- a/test/parallel/test-stream-writable-end-multiple.js +++ b/test/parallel/test-stream-writable-end-multiple.js @@ -6,7 +6,6 @@ const assert = require('assert'); const stream = require('stream'); const writable = new stream.Writable(); - writable._write = (chunk, encoding, cb) => { setTimeout(() => cb(), 10); }; @@ -14,7 +13,10 @@ writable._write = (chunk, encoding, cb) => { writable.end('testing ended state', common.mustCall()); writable.end(common.mustCall()); writable.on('finish', common.mustCall(() => { + let ticked = false; writable.end(common.mustCall((err) => { + assert.strictEqual(ticked, true); assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); })); + ticked = true; })); diff --git a/test/parallel/test-stream-writable-null.js b/test/parallel/test-stream-writable-null.js index 7b5c35ff1cca87..f26fc62328cfea 100644 --- a/test/parallel/test-stream-writable-null.js +++ b/test/parallel/test-stream-writable-null.js @@ -1,5 +1,5 @@ 'use strict'; -require('../common'); +const common = require('../common'); const assert = require('assert'); const stream = require('stream'); @@ -14,33 +14,29 @@ class MyWritable extends stream.Writable { } } -assert.throws( - () => { - const m = new MyWritable({ objectMode: true }); - m.write(null, (err) => assert.ok(err)); - }, - { +{ + const m = new MyWritable({ objectMode: true }); + m.write(null, (err) => assert.ok(err)); + m.on('error', common.expectsError({ code: 'ERR_STREAM_NULL_VALUES', name: 'TypeError', message: 'May not write null values to stream' - } -); + })); +} { // Should not throw. const m = new MyWritable({ objectMode: true }).on('error', assert); m.write(null, assert); } -assert.throws( - () => { - const m = new MyWritable(); - m.write(false, (err) => assert.ok(err)); - }, - { +{ + const m = new MyWritable(); + m.write(false, (err) => assert.ok(err)); + m.on('error', common.expectsError({ code: 'ERR_INVALID_ARG_TYPE', name: 'TypeError' - } -); + })); +} { // Should not throw. const m = new MyWritable().on('error', assert); diff --git a/test/parallel/test-stream-writable-write-error.js b/test/parallel/test-stream-writable-write-error.js new file mode 100644 index 00000000000000..e23b24a19df487 --- /dev/null +++ b/test/parallel/test-stream-writable-write-error.js @@ -0,0 +1,59 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Writable } = require('stream'); + +function expectError(w, arg, code) { + let errorCalled = false; + let ticked = false; + w.write(arg, common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(errorCalled, false); + assert.strictEqual(err.code, code); + })); + ticked = true; + w.on('error', common.mustCall((err) => { + errorCalled = true; + assert.strictEqual(err.code, code); + })); +} + +function test(autoDestroy) { + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + w.end(); + expectError(w, 'asd', 'ERR_STREAM_WRITE_AFTER_END'); + } + + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + w.destroy(); + expectError(w, 'asd', 'ERR_STREAM_DESTROYED'); + } + + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + expectError(w, null, 'ERR_STREAM_NULL_VALUES'); + } + + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + expectError(w, {}, 'ERR_INVALID_ARG_TYPE'); + } +} + +test(false); +test(true); diff --git a/test/parallel/test-zlib-object-write.js b/test/parallel/test-zlib-object-write.js index 98f34b38bed850..df533d77b3fcdd 100644 --- a/test/parallel/test-zlib-object-write.js +++ b/test/parallel/test-zlib-object-write.js @@ -1,11 +1,12 @@ 'use strict'; -require('../common'); -const assert = require('assert'); +const common = require('../common'); const { Gunzip } = require('zlib'); const gunzip = new Gunzip({ objectMode: true }); -assert.throws( - () => gunzip.write({}), - TypeError -); +gunzip.write({}, common.expectsError({ + name: 'TypeError' +})); +gunzip.on('error', common.expectsError({ + name: 'TypeError' +})); diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 06a7d3f1917543..d67abee0ac3d58 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -21,18 +21,19 @@ 'use strict'; const common = require('../common'); -const assert = require('assert'); const zlib = require('zlib'); zlib.gzip('hello', common.mustCall(function(err, out) { const unzip = zlib.createGunzip(); unzip.close(common.mustCall()); - assert.throws( - () => unzip.write(out), - { - code: 'ERR_STREAM_DESTROYED', - name: 'Error', - message: 'Cannot call write after a stream was destroyed' - } - ); + unzip.write(out, common.expectsError({ + code: 'ERR_STREAM_DESTROYED', + name: 'Error', + message: 'Cannot call write after a stream was destroyed' + })); + unzip.on('error', common.expectsError({ + code: 'ERR_STREAM_DESTROYED', + name: 'Error', + message: 'Cannot call write after a stream was destroyed' + })); }));