diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 28802cae5eff32..cd62723d2d5382 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -25,6 +25,7 @@ const { kAutoDestroy, kErrored, } = require('internal/streams/utils'); +const { queueMicrotask } = require('internal/process/task_queues'); const kDestroy = Symbol('kDestroy'); const kConstruct = Symbol('kConstruct'); @@ -112,9 +113,13 @@ function _destroy(self, err, cb) { } if (err) { - process.nextTick(emitErrorCloseNT, self, err); + queueMicrotask(() => { + emitErrorCloseNT(self, err); + }); } else { - process.nextTick(emitCloseNT, self); + queueMicrotask(() => { + emitCloseNT(self); + }); } } try { @@ -233,7 +238,9 @@ function errorOrDestroy(stream, err, sync) { r.errored = err; } if (sync) { - process.nextTick(emitErrorNT, stream, err); + queueMicrotask(() => { + emitErrorNT(stream, err); + }); } else { emitErrorNT(stream, err); } @@ -262,7 +269,9 @@ function construct(stream, cb) { return; } - process.nextTick(constructNT, stream); + queueMicrotask(() => { + constructNT(stream); + }); } function constructNT(stream) { @@ -291,16 +300,22 @@ function constructNT(stream) { } else if (err) { errorOrDestroy(stream, err, true); } else { - process.nextTick(emitConstructNT, stream); + queueMicrotask(() => { + emitConstructNT(stream); + }); } } try { stream._construct((err) => { - process.nextTick(onConstruct, err); + queueMicrotask(() =>{ + onConstruct(err); + }); }); } catch (err) { - process.nextTick(onConstruct, err); + queueMicrotask(() => { + onConstruct(err); + }); } } @@ -318,11 +333,14 @@ function emitCloseLegacy(stream) { function emitErrorCloseLegacy(stream, err) { stream.emit('error', err); - process.nextTick(emitCloseLegacy, stream); + queueMicrotask(() => { + emitCloseLegacy(stream); + }); } // Normalize destroy for legacy. function destroyer(stream, err) { + process._rawDebug("### 0") if (!stream || isDestroyed(stream)) { return; } @@ -345,9 +363,13 @@ function destroyer(stream, err) { // TODO: Don't lose err? stream.close(); } else if (err) { - process.nextTick(emitErrorCloseLegacy, stream, err); + queueMicrotask(() => { + emitErrorCloseLegacy(stream, err); + }); } else { - process.nextTick(emitCloseLegacy, stream); + queueMicrotask(() => { + emitCloseLegacy(stream); + }); } if (!stream.destroyed) { diff --git a/test/parallel/test-stream-destroy.js b/test/parallel/test-stream-destroy.js index 5269ccfec50271..2e12539c2208ca 100644 --- a/test/parallel/test-stream-destroy.js +++ b/test/parallel/test-stream-destroy.js @@ -118,3 +118,23 @@ const http = require('http'); req.end('asd'); }); } + +{ + // Destroy timing relative to Promise + + new Promise(resolve => { + const r = new Readable({ read() {} }); + destroy(r, new Error('asd')); + resolve(r); + }).then(common.mustCall(r => { + r.on('error', common.mustCall()); + })); + + new Promise(resolve => { + const r = new Readable({ read() {} }); + r.destroy(new Error('asd')); + resolve(r); + }).then(common.mustCall(r => { + r.on('error', common.mustCall()); + })); +}