From 7ed1e1256ad085d90fd8f3fb1fb54b42766a1d12 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Dec 2023 13:07:40 +0100 Subject: [PATCH] stream: fix timing relative to promises PR-URL: https://github.com/nodejs/node/pull/51070 --- lib/internal/streams/destroy.js | 41 +++++++++++++++++++++------- test/parallel/test-stream-destroy.js | 20 ++++++++++++++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 28802cae5eff32..e85fc1fdbf13bd 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -112,9 +112,13 @@ function _destroy(self, err, cb) { } if (err) { - process.nextTick(emitErrorCloseNT, self, err); + queueMicrotask(() => { + emitErrorCloseNT(self, err); + }); } else { - process.nextTick(emitCloseNT, self); + queueMicrotask(() => { + emitCloseNT(self); + }); } } try { @@ -233,7 +237,9 @@ function errorOrDestroy(stream, err, sync) { r.errored = err; } if (sync) { - process.nextTick(emitErrorNT, stream, err); + queueMicrotask(() => { + emitErrorNT(stream, err); + }); } else { emitErrorNT(stream, err); } @@ -262,7 +268,9 @@ function construct(stream, cb) { return; } - process.nextTick(constructNT, stream); + queueMicrotask(() => { + constructNT(stream); + }); } function constructNT(stream) { @@ -291,16 +299,22 @@ function constructNT(stream) { } else if (err) { errorOrDestroy(stream, err, true); } else { - process.nextTick(emitConstructNT, stream); + queueMicrotask(() => { + emitConstructNT(stream); + }); } } try { stream._construct((err) => { - process.nextTick(onConstruct, err); + queueMicrotask(() =>{ + onConstruct(err); + }); }); } catch (err) { - process.nextTick(onConstruct, err); + queueMicrotask(() => { + onConstruct(err); + }); } } @@ -318,11 +332,14 @@ function emitCloseLegacy(stream) { function emitErrorCloseLegacy(stream, err) { stream.emit('error', err); - process.nextTick(emitCloseLegacy, stream); + queueMicrotask(() => { + emitCloseLegacy(stream); + }); } // Normalize destroy for legacy. function destroyer(stream, err) { + process._rawDebug("### 0") if (!stream || isDestroyed(stream)) { return; } @@ -345,9 +362,13 @@ function destroyer(stream, err) { // TODO: Don't lose err? stream.close(); } else if (err) { - process.nextTick(emitErrorCloseLegacy, stream, err); + queueMicrotask(() => { + emitErrorCloseLegacy(stream, err); + }); } else { - process.nextTick(emitCloseLegacy, stream); + queueMicrotask(() => { + emitCloseLegacy(stream); + }); } if (!stream.destroyed) { diff --git a/test/parallel/test-stream-destroy.js b/test/parallel/test-stream-destroy.js index 5269ccfec50271..2e12539c2208ca 100644 --- a/test/parallel/test-stream-destroy.js +++ b/test/parallel/test-stream-destroy.js @@ -118,3 +118,23 @@ const http = require('http'); req.end('asd'); }); } + +{ + // Destroy timing relative to Promise + + new Promise(resolve => { + const r = new Readable({ read() {} }); + destroy(r, new Error('asd')); + resolve(r); + }).then(common.mustCall(r => { + r.on('error', common.mustCall()); + })); + + new Promise(resolve => { + const r = new Readable({ read() {} }); + r.destroy(new Error('asd')); + resolve(r); + }).then(common.mustCall(r => { + r.on('error', common.mustCall()); + })); +}