diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index edf1d37f9fe3bf..5f71a25eefd32f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -224,6 +224,10 @@ function pipelineImpl(streams, callback, opts) { finishImpl(err, --finishCount === 0); } + function finishOnlyHandleError(err) { + finishImpl(err, false); + } + function finishImpl(err, final) { if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { error = err; @@ -273,7 +277,7 @@ function pipelineImpl(streams, callback, opts) { err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ) { - finish(err); + finishOnlyHandleError(err); } } stream.on('error', onError); @@ -366,7 +370,7 @@ function pipelineImpl(streams, callback, opts) { } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; - const cleanup = pipe(ret, stream, finish, { end }); + const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end }); if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup); } @@ -409,12 +413,12 @@ function pipelineImpl(streams, callback, opts) { return ret; } -function pipe(src, dst, finish, { end }) { +function pipe(src, dst, finish, finishOnlyHandleError, { end }) { let ended = false; dst.on('close', () => { if (!ended) { // Finish if the destination closes before the source has completed. - finish(new ERR_STREAM_PREMATURE_CLOSE()); + finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE()); } }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 8237fff33b3ac8..5525fffa46d6d0 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1664,5 +1664,47 @@ const tsp = require('timers/promises'); pipeline(r, w, common.mustCall((err) => { assert.strictEqual(err, undefined); })); +} + +{ + // See https://github.com/nodejs/node/issues/51540 for the following 2 tests + const src = new Readable(); + const dst = new Writable({ + destroy(error, cb) { + // Takes a while to destroy + setImmediate(cb); + }, + }); + + pipeline(src, dst, (err) => { + assert.strictEqual(src.closed, true); + assert.strictEqual(dst.closed, true); + assert.strictEqual(err.message, 'problem'); + }); + src.destroy(new Error('problem')); +} +{ + const src = new Readable(); + const dst = new Writable({ + destroy(error, cb) { + // Takes a while to destroy + setImmediate(cb); + }, + }); + const passThroughs = []; + for (let i = 0; i < 10; i++) { + passThroughs.push(new PassThrough()); + } + + pipeline(src, ...passThroughs, dst, (err) => { + assert.strictEqual(src.closed, true); + assert.strictEqual(dst.closed, true); + assert.strictEqual(err.message, 'problem'); + + for (let i = 0; i < passThroughs.length; i++) { + assert.strictEqual(passThroughs[i].closed, true); + } + }); + src.destroy(new Error('problem')); }