From fd2fa5a362fa37c40756b78bb1c6de7936fb315a Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 02:28:30 +0530 Subject: [PATCH 01/16] stream: implement finished() for ReadableStream and WritableStream Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/end-of-stream.js | 23 ++++++++++-- lib/internal/streams/utils.js | 10 +++--- lib/internal/webstreams/readablestream.js | 14 +++++++- lib/internal/webstreams/writablestream.js | 14 +++++++- test/parallel/test-stream-end-of-streams.js | 20 ----------- test/parallel/test-stream-finished.js | 7 +--- test/parallel/test-webstreams-finished.js | 40 +++++++++++++++++++++ 7 files changed, 92 insertions(+), 36 deletions(-) delete mode 100644 test/parallel/test-stream-end-of-streams.js create mode 100644 test/parallel/test-webstreams-finished.js diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index ca42174c86459a..06684f032ce00c 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -22,7 +22,7 @@ const { validateBoolean } = require('internal/validators'); -const { Promise } = primordials; +const { Promise, PromisePrototypeThen } = primordials; const { isClosed, @@ -38,6 +38,15 @@ const { willEmitClose: _willEmitClose, } = require('internal/streams/utils'); +const { + isBrandCheck, +} = require('internal/webstreams/util'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -62,8 +71,7 @@ function eos(stream, options, callback) { const writable = options.writable ?? isWritableNodeStream(stream); if (!isNodeStream(stream)) { - // TODO: Webstreams. - throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream); + return eosWeb(stream, options, callback); } const wState = stream._writableState; @@ -255,6 +263,15 @@ function eos(stream, options, callback) { return cleanup; } +function eosWeb(stream, opts, callback) { + PromisePrototypeThen( + stream.streamClosed, + () => callback.call(stream), + (err) => callback.call(stream, err) + ); + return nop; +} + function finished(stream, opts) { let autoCleanup = false; if (opts === null) { diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 4d4f00ab456fa7..e5089e4d4f4983 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -83,7 +83,7 @@ function isWritableEnded(stream) { // Have emitted 'finish'. function isWritableFinished(stream, strict) { - if (!isWritableNodeStream(stream)) return null; + if (!isWritableNodeStream(stream)) return stream?.state === 'closed' ? true : null; if (stream.writableFinished === true) return true; const wState = stream._writableState; if (wState?.errored) return false; @@ -106,7 +106,7 @@ function isReadableEnded(stream) { // Have emitted 'end'. function isReadableFinished(stream, strict) { - if (!isReadableNodeStream(stream)) return null; + if (!isReadableNodeStream(stream)) stream?.state === 'closed' ? true : null; const rState = stream._readableState; if (rState?.errored) return false; if (typeof rState?.endEmitted !== 'boolean') return null; @@ -155,7 +155,7 @@ function isFinished(stream, opts) { function isWritableErrored(stream) { if (!isNodeStream(stream)) { - return null; + return stream?.state === 'errored' ? true : null; } if (stream.writableErrored) { @@ -167,7 +167,7 @@ function isWritableErrored(stream) { function isReadableErrored(stream) { if (!isNodeStream(stream)) { - return null; + return stream?.state === 'errored' ? true : null; } if (stream.readableErrored) { @@ -179,7 +179,7 @@ function isReadableErrored(stream) { function isClosed(stream) { if (!isNodeStream(stream)) { - return null; + return stream?.state === 'closed' ? true : null; } if (typeof stream.closed === 'boolean') { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 5344785b90cd3e..ecbbaa071e5767 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -231,7 +231,8 @@ class ReadableStream { port1: undefined, port2: undefined, promise: undefined, - } + }, + streamClosed: createDeferredPromise(), }; // The spec requires handling of the strategy first @@ -288,6 +289,12 @@ class ReadableStream { return isReadableStreamLocked(this); } + get streamClosed() { + if (!isReadableStream(this)) + throw new ERR_INVALID_THIS('ReadableStream'); + return this[kState].streamClosed.promise; + } + /** * @param {any} [reason] * @returns { Promise } @@ -1869,6 +1876,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; + stream[kState].streamClosed?.resolve?.(); const { reader, @@ -1900,6 +1908,10 @@ function readableStreamError(stream, error) { reader[kState].close.reject(error); setPromiseHandled(reader[kState].close.promise); + if (stream[kState].streamClosed?.promise !== undefined) { + stream[kState].streamClosed?.reject?.(error); + setPromiseHandled(stream[kState].streamClosed?.promise); + } if (readableStreamHasDefaultReader(stream)) { for (let n = 0; n < reader[kState].readRequests.length; n++) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index ba66cea7a4850d..ba5cf75ca71885 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -175,7 +175,8 @@ class WritableStream { port1: undefined, port2: undefined, promise: undefined, - } + }, + streamClosed: createDeferredPromise(), }; const size = extractSizeAlgorithm(strategy?.size); @@ -201,6 +202,12 @@ class WritableStream { return isWritableStreamLocked(this); } + get streamClosed() { + if (!isWritableStream(this)) + throw new ERR_INVALID_THIS('WritableStream'); + return this[kState].streamClosed.promise; + } + /** * @param {any} reason * @returns {Promise} @@ -733,6 +740,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { writer[kState].close.reject?.(stream[kState].storedError); setPromiseHandled(writer[kState].close.promise); } + if (stream[kState].streamClosed?.promise !== undefined) { + stream[kState].streamClosed.reject?.(stream[kState]?.storedError); + setPromiseHandled(stream[kState].streamClosed?.promise); + } } function writableStreamMarkFirstWriteRequestInFlight(stream) { @@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); + stream[kState].streamClosed?.resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } diff --git a/test/parallel/test-stream-end-of-streams.js b/test/parallel/test-stream-end-of-streams.js deleted file mode 100644 index 80a39d052bf8b4..00000000000000 --- a/test/parallel/test-stream-end-of-streams.js +++ /dev/null @@ -1,20 +0,0 @@ -'use strict'; -require('../common'); -const assert = require('assert'); - -const { Duplex, finished } = require('stream'); - -assert.throws( - () => { - // Passing empty object to mock invalid stream - // should throw error - finished({}, () => {}); - }, - { code: 'ERR_INVALID_ARG_TYPE' } -); - -const streamObj = new Duplex(); -streamObj.end(); -// Below code should not throw any errors as the -// streamObj is `Stream` -finished(streamObj, () => {}); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index c7513805e7ac6f..6c850a63a7c56b 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -260,12 +260,7 @@ const http = require('http'); const streamLike = new EE(); streamLike.readableEnded = true; streamLike.readable = true; - assert.throws( - () => { - finished(streamLike, () => {}); - }, - { code: 'ERR_INVALID_ARG_TYPE' } - ); + finished(streamLike, common.mustCall()); streamLike.emit('close'); } diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js new file mode 100644 index 00000000000000..c2c1e03564a6f7 --- /dev/null +++ b/test/parallel/test-webstreams-finished.js @@ -0,0 +1,40 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { ReadableStream, WritableStream } = require('stream/web'); +const { finished } = require('stream'); + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + }, + }); + finished(rs, common.mustSucceed()); + async function test() { + const values = []; + for await (const chunk of rs) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + test(); +} + +{ + let str = ''; + const ws = new WritableStream({ + write(chunk) { + console.log(chunk); + str += chunk; + } + }); + finished(ws, common.mustSucceed(() => { + assert.strictEqual(str, 'asd'); + })); + const writer = ws.getWriter(); + writer.write('asd'); + writer.close(); +} From 7938b4a99f2938e15a0fae4613fe738d7d745ad9 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 13:39:14 +0530 Subject: [PATCH 02/16] fixup! restore tests --- test/parallel/test-stream-end-of-streams.js | 20 ++++++++++++++++++++ test/parallel/test-stream-finished.js | 7 ++++++- 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-end-of-streams.js diff --git a/test/parallel/test-stream-end-of-streams.js b/test/parallel/test-stream-end-of-streams.js new file mode 100644 index 00000000000000..80a39d052bf8b4 --- /dev/null +++ b/test/parallel/test-stream-end-of-streams.js @@ -0,0 +1,20 @@ +'use strict'; +require('../common'); +const assert = require('assert'); + +const { Duplex, finished } = require('stream'); + +assert.throws( + () => { + // Passing empty object to mock invalid stream + // should throw error + finished({}, () => {}); + }, + { code: 'ERR_INVALID_ARG_TYPE' } +); + +const streamObj = new Duplex(); +streamObj.end(); +// Below code should not throw any errors as the +// streamObj is `Stream` +finished(streamObj, () => {}); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 6c850a63a7c56b..c7513805e7ac6f 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -260,7 +260,12 @@ const http = require('http'); const streamLike = new EE(); streamLike.readableEnded = true; streamLike.readable = true; - finished(streamLike, common.mustCall()); + assert.throws( + () => { + finished(streamLike, () => {}); + }, + { code: 'ERR_INVALID_ARG_TYPE' } + ); streamLike.emit('close'); } From f0b8c9229d2289946f6dbf0435a659588890d360 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 15:12:41 +0530 Subject: [PATCH 03/16] fixup! add non-stream check --- lib/internal/streams/end-of-stream.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 06684f032ce00c..97500f1a0b58f5 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -70,6 +70,10 @@ function eos(stream, options, callback) { const readable = options.readable ?? isReadableNodeStream(stream); const writable = options.writable ?? isWritableNodeStream(stream); + if (!isNodeStream(stream) && !isReadableStream(stream) && !isWritableStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); + } + if (!isNodeStream(stream)) { return eosWeb(stream, options, callback); } From 3937b22da94d3d428f630ea492c6ade15f3bd090 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 15:20:03 +0530 Subject: [PATCH 04/16] fixup! fix return --- lib/internal/streams/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index e5089e4d4f4983..8494ca66fcb5eb 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -106,7 +106,7 @@ function isReadableEnded(stream) { // Have emitted 'end'. function isReadableFinished(stream, strict) { - if (!isReadableNodeStream(stream)) stream?.state === 'closed' ? true : null; + if (!isReadableNodeStream(stream)) return stream?.state === 'closed' ? true : null; const rState = stream._readableState; if (rState?.errored) return false; if (typeof rState?.endEmitted !== 'boolean') return null; From 70630b330036b2fc3b6f2a08fad2b37c1d6a0403 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 15:28:38 +0530 Subject: [PATCH 05/16] fixup! remove public property --- lib/internal/streams/end-of-stream.js | 3 ++- lib/internal/webstreams/readablestream.js | 6 ------ lib/internal/webstreams/writablestream.js | 6 ------ 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 97500f1a0b58f5..bff89d702df752 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -40,6 +40,7 @@ const { const { isBrandCheck, + kState, } = require('internal/webstreams/util'); const isReadableStream = @@ -269,7 +270,7 @@ function eos(stream, options, callback) { function eosWeb(stream, opts, callback) { PromisePrototypeThen( - stream.streamClosed, + stream[kState].streamClosed, () => callback.call(stream), (err) => callback.call(stream, err) ); diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index ecbbaa071e5767..8b903ece4317f8 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -289,12 +289,6 @@ class ReadableStream { return isReadableStreamLocked(this); } - get streamClosed() { - if (!isReadableStream(this)) - throw new ERR_INVALID_THIS('ReadableStream'); - return this[kState].streamClosed.promise; - } - /** * @param {any} [reason] * @returns { Promise } diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index ba5cf75ca71885..19e645da1a4b89 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -202,12 +202,6 @@ class WritableStream { return isWritableStreamLocked(this); } - get streamClosed() { - if (!isWritableStream(this)) - throw new ERR_INVALID_THIS('WritableStream'); - return this[kState].streamClosed.promise; - } - /** * @param {any} reason * @returns {Promise} From 2d7ea183bb6c16fc003fc3eaf3a57d9b68027e9a Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 17:32:36 +0530 Subject: [PATCH 06/16] fixup! use the promise and use process.nextTick --- lib/internal/streams/end-of-stream.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index bff89d702df752..1cc100bbe69d5f 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -270,9 +270,9 @@ function eos(stream, options, callback) { function eosWeb(stream, opts, callback) { PromisePrototypeThen( - stream[kState].streamClosed, - () => callback.call(stream), - (err) => callback.call(stream, err) + stream[kState].streamClosed.promise, + () => process.nextTick(callback, stream), + (err) => process.nextTick(callback, stream, err), ); return nop; } From b6461d9ba1d5f8b51bc9eac1c0abd2c0108a3774 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 00:04:54 +0530 Subject: [PATCH 07/16] fixup! fix code style and nullish checks remove --- lib/internal/streams/end-of-stream.js | 16 ++++++++-------- lib/internal/webstreams/readablestream.js | 14 +++++++------- lib/internal/webstreams/writablestream.js | 1 + 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 1cc100bbe69d5f..135bd4e3a0c748 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -68,17 +68,17 @@ function eos(stream, options, callback) { callback = once(callback); - const readable = options.readable ?? isReadableNodeStream(stream); - const writable = options.writable ?? isWritableNodeStream(stream); - - if (!isNodeStream(stream) && !isReadableStream(stream) && !isWritableStream(stream)) { - throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); + if (isReadableStream(stream) || isWritableStream(stream)) { + return eosWeb(stream, options, callback); } if (!isNodeStream(stream)) { - return eosWeb(stream, options, callback); + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); } + const readable = options.readable ?? isReadableNodeStream(stream); + const writable = options.writable ?? isWritableNodeStream(stream); + const wState = stream._writableState; const rState = stream._readableState; @@ -271,8 +271,8 @@ function eos(stream, options, callback) { function eosWeb(stream, opts, callback) { PromisePrototypeThen( stream[kState].streamClosed.promise, - () => process.nextTick(callback, stream), - (err) => process.nextTick(callback, stream, err), + () => process.nextTick(() => callback.call(stream)), + (err) => process.nextTick(() => callback.call(stream, err)), ); return nop; } diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8b903ece4317f8..56e265a960562d 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -626,7 +626,8 @@ function TransferredReadableStream() { writable: undefined, port: undefined, promise: undefined, - } + }, + streamClosed: createDeferredPromise(), }; }, [], ReadableStream)); @@ -1196,7 +1197,8 @@ function createTeeReadableStream(start, pull, cancel) { writable: undefined, port: undefined, promise: undefined, - } + }, + streamClosed: createDeferredPromise(), }; setupReadableStreamDefaultControllerFromSource( this, @@ -1870,7 +1872,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; - stream[kState].streamClosed?.resolve?.(); + stream[kState].streamClosed.resolve(); const { reader, @@ -1892,6 +1894,8 @@ function readableStreamError(stream, error) { assert(stream[kState].state === 'readable'); stream[kState].state = 'errored'; stream[kState].storedError = error; + stream[kState].streamClosed.reject(error); + setPromiseHandled(stream[kState].streamClosed.promise); const { reader @@ -1902,10 +1906,6 @@ function readableStreamError(stream, error) { reader[kState].close.reject(error); setPromiseHandled(reader[kState].close.promise); - if (stream[kState].streamClosed?.promise !== undefined) { - stream[kState].streamClosed?.reject?.(error); - setPromiseHandled(stream[kState].streamClosed?.promise); - } if (readableStreamHasDefaultReader(stream)) { for (let n = 0; n < reader[kState].readRequests.length; n++) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 19e645da1a4b89..7cc06f6fbc3805 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -347,6 +347,7 @@ function TransferredWritableStream() { port2: undefined, readable: undefined, }, + streamClosed: createDeferredPromise() }; }, [], WritableStream)); From 1572862cedc2a0f9f87c5abeb4a888c7cf7bd6fe Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 01:23:04 +0530 Subject: [PATCH 08/16] fixup! remove nullish in writablestreams --- lib/internal/webstreams/writablestream.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 7cc06f6fbc3805..c263d0b1b37ea9 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -728,6 +728,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { resolve: undefined, }; } + + stream[kState].streamClosed.reject(stream[kState]?.storedError); + setPromiseHandled(stream[kState].streamClosed.promise); + const { writer, } = stream[kState]; @@ -735,10 +739,6 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { writer[kState].close.reject?.(stream[kState].storedError); setPromiseHandled(writer[kState].close.promise); } - if (stream[kState].streamClosed?.promise !== undefined) { - stream[kState].streamClosed.reject?.(stream[kState]?.storedError); - setPromiseHandled(stream[kState].streamClosed?.promise); - } } function writableStreamMarkFirstWriteRequestInFlight(stream) { From ebeab98213887a84ac830f289b279de618c52e30 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 02:11:46 +0530 Subject: [PATCH 09/16] fixup! add tests --- test/parallel/test-webstreams-finished.js | 197 +++++++++++++++++++++- 1 file changed, 196 insertions(+), 1 deletion(-) diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js index c2c1e03564a6f7..abf15b6158a480 100644 --- a/test/parallel/test-webstreams-finished.js +++ b/test/parallel/test-webstreams-finished.js @@ -4,6 +4,7 @@ const common = require('../common'); const assert = require('assert'); const { ReadableStream, WritableStream } = require('stream/web'); const { finished } = require('stream'); +const { finished: finishedPromise } = require('stream/promises'); { const rs = new ReadableStream({ @@ -23,18 +24,212 @@ const { finished } = require('stream'); test(); } +{ + const rs = new ReadableStream({ + start(controller) { + controller.error(new Error('asd')); + } + }); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); +} + +{ + const rs = new ReadableStream({ + async start(controller) { + throw new Error('asd'); + } + }); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + + async function test() { + const values = []; + for await (const chunk of rs) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + + finishedPromise(rs).then(common.mustSucceed()); + + test(); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.error(new Error('asd')); + } + }); + + finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); +} + +{ + const rs = new ReadableStream({ + async start(controller) { + throw new Error('asd'); + } + }); + + finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + + const { 0: s1, 1: s2 } = rs.tee(); + + finished(s1, common.mustSucceed()); + finished(s2, common.mustSucceed()); + + async function test(stream) { + const values = []; + for await (const chunk of stream) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + + Promise.all([ + test(s1), + test(s2), + ]).then(common.mustCall()); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.error(new Error('asd')); + } + }); + + const { 0: s1, 1: s2 } = rs.tee(); + + finished(s1, common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); + + finished(s2, common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + + finished(rs, common.mustSucceed()); + + rs.cancel(); +} + { let str = ''; const ws = new WritableStream({ write(chunk) { - console.log(chunk); str += chunk; } }); + finished(ws, common.mustSucceed(() => { assert.strictEqual(str, 'asd'); })); + const writer = ws.getWriter(); writer.write('asd'); writer.close(); } + +{ + const ws = new WritableStream({ + async write(chunk) { + throw new Error('asd'); + } + }); + + finished(ws, common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); + + const writer = ws.getWriter(); + writer.write('asd').catch((err) => { + assert.strictEqual(err?.message, "asd"); + }); +} + +{ + let str = ''; + const ws = new WritableStream({ + write(chunk) { + str += chunk; + } + }); + + finishedPromise(ws).then(common.mustSucceed(() => { + assert.strictEqual(str, 'asd'); + })); + + const writer = ws.getWriter(); + writer.write('asd'); + writer.close(); +} + +{ + let str = ''; + const ws = new WritableStream({ + write(chunk) { + str += chunk; + } + }); + finished(ws, common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); + + const writer = ws.getWriter(); + writer.abort(new Error('asd')); +} + +{ + const ws = new WritableStream({ + async write(chunk) { + throw new Error('asd'); + } + }); + + finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, "asd"); + })); + + const writer = ws.getWriter(); + writer.write('asd').catch((err) => { + assert.strictEqual(err?.message, "asd"); + }); +} \ No newline at end of file From dd5391bc6be986faa37a844b7ec8902fcf768a81 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 02:15:22 +0530 Subject: [PATCH 10/16] fixup! remove util changes --- lib/internal/streams/utils.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 8494ca66fcb5eb..4d4f00ab456fa7 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -83,7 +83,7 @@ function isWritableEnded(stream) { // Have emitted 'finish'. function isWritableFinished(stream, strict) { - if (!isWritableNodeStream(stream)) return stream?.state === 'closed' ? true : null; + if (!isWritableNodeStream(stream)) return null; if (stream.writableFinished === true) return true; const wState = stream._writableState; if (wState?.errored) return false; @@ -106,7 +106,7 @@ function isReadableEnded(stream) { // Have emitted 'end'. function isReadableFinished(stream, strict) { - if (!isReadableNodeStream(stream)) return stream?.state === 'closed' ? true : null; + if (!isReadableNodeStream(stream)) return null; const rState = stream._readableState; if (rState?.errored) return false; if (typeof rState?.endEmitted !== 'boolean') return null; @@ -155,7 +155,7 @@ function isFinished(stream, opts) { function isWritableErrored(stream) { if (!isNodeStream(stream)) { - return stream?.state === 'errored' ? true : null; + return null; } if (stream.writableErrored) { @@ -167,7 +167,7 @@ function isWritableErrored(stream) { function isReadableErrored(stream) { if (!isNodeStream(stream)) { - return stream?.state === 'errored' ? true : null; + return null; } if (stream.readableErrored) { @@ -179,7 +179,7 @@ function isReadableErrored(stream) { function isClosed(stream) { if (!isNodeStream(stream)) { - return stream?.state === 'closed' ? true : null; + return null; } if (typeof stream.closed === 'boolean') { From 9aadb9eb4779162bff2a7fbb9acc5f7038a64fc9 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 02:17:50 +0530 Subject: [PATCH 11/16] fixup! lint fix --- test/parallel/test-webstreams-finished.js | 29 ++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js index abf15b6158a480..65a14d863eb922 100644 --- a/test/parallel/test-webstreams-finished.js +++ b/test/parallel/test-webstreams-finished.js @@ -32,7 +32,7 @@ const { finished: finishedPromise } = require('stream/promises'); }); finished(rs, common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); } @@ -44,7 +44,7 @@ const { finished: finishedPromise } = require('stream/promises'); }); finished(rs, common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); } @@ -77,7 +77,7 @@ const { finished: finishedPromise } = require('stream/promises'); }); finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); } @@ -89,7 +89,7 @@ const { finished: finishedPromise } = require('stream/promises'); }); finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); } @@ -130,11 +130,11 @@ const { finished: finishedPromise } = require('stream/promises'); const { 0: s1, 1: s2 } = rs.tee(); finished(s1, common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); finished(s2, common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); } @@ -176,12 +176,12 @@ const { finished: finishedPromise } = require('stream/promises'); }); finished(ws, common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); const writer = ws.getWriter(); writer.write('asd').catch((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); }); } @@ -203,14 +203,11 @@ const { finished: finishedPromise } = require('stream/promises'); } { - let str = ''; const ws = new WritableStream({ - write(chunk) { - str += chunk; - } + write(chunk) { } }); finished(ws, common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); const writer = ws.getWriter(); @@ -225,11 +222,11 @@ const { finished: finishedPromise } = require('stream/promises'); }); finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); })); const writer = ws.getWriter(); writer.write('asd').catch((err) => { - assert.strictEqual(err?.message, "asd"); + assert.strictEqual(err?.message, 'asd'); }); -} \ No newline at end of file +} From e924b6075b582ab5a9fc565b5cea940f94ee3029 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 02:27:29 +0530 Subject: [PATCH 12/16] fixup! rename streamClosed to closed --- lib/internal/streams/end-of-stream.js | 2 +- lib/internal/webstreams/readablestream.js | 12 ++++++------ lib/internal/webstreams/writablestream.js | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 135bd4e3a0c748..0aadee16d11dc9 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -270,7 +270,7 @@ function eos(stream, options, callback) { function eosWeb(stream, opts, callback) { PromisePrototypeThen( - stream[kState].streamClosed.promise, + stream[kState].closed.promise, () => process.nextTick(() => callback.call(stream)), (err) => process.nextTick(() => callback.call(stream, err)), ); diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 56e265a960562d..44e55b9c184e51 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -232,7 +232,7 @@ class ReadableStream { port2: undefined, promise: undefined, }, - streamClosed: createDeferredPromise(), + closed: createDeferredPromise(), }; // The spec requires handling of the strategy first @@ -627,7 +627,7 @@ function TransferredReadableStream() { port: undefined, promise: undefined, }, - streamClosed: createDeferredPromise(), + closed: createDeferredPromise(), }; }, [], ReadableStream)); @@ -1198,7 +1198,7 @@ function createTeeReadableStream(start, pull, cancel) { port: undefined, promise: undefined, }, - streamClosed: createDeferredPromise(), + closed: createDeferredPromise(), }; setupReadableStreamDefaultControllerFromSource( this, @@ -1872,7 +1872,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; - stream[kState].streamClosed.resolve(); + stream[kState].closed.resolve(); const { reader, @@ -1894,8 +1894,8 @@ function readableStreamError(stream, error) { assert(stream[kState].state === 'readable'); stream[kState].state = 'errored'; stream[kState].storedError = error; - stream[kState].streamClosed.reject(error); - setPromiseHandled(stream[kState].streamClosed.promise); + stream[kState].closed.reject(error); + setPromiseHandled(stream[kState].closed.promise); const { reader diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index c263d0b1b37ea9..583d3c7261d58a 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -176,7 +176,7 @@ class WritableStream { port2: undefined, promise: undefined, }, - streamClosed: createDeferredPromise(), + closed: createDeferredPromise(), }; const size = extractSizeAlgorithm(strategy?.size); @@ -347,7 +347,7 @@ function TransferredWritableStream() { port2: undefined, readable: undefined, }, - streamClosed: createDeferredPromise() + closed: createDeferredPromise() }; }, [], WritableStream)); @@ -729,8 +729,8 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { }; } - stream[kState].streamClosed.reject(stream[kState]?.storedError); - setPromiseHandled(stream[kState].streamClosed.promise); + stream[kState].closed.reject(stream[kState]?.storedError); + setPromiseHandled(stream[kState].closed.promise); const { writer, @@ -845,7 +845,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); - stream[kState].streamClosed?.resolve?.(); + stream[kState].closed?.resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } From f1cbaea6feed7b35d307a5ba76055604c0517cc0 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 15:37:39 +0530 Subject: [PATCH 13/16] fixup! remove nullish Co-authored-by: Robert Nagy --- lib/internal/webstreams/writablestream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 583d3c7261d58a..31d9702d991180 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -845,7 +845,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); - stream[kState].closed?.resolve?.(); + stream[kState].closed.resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } From 5ae4f8a6ee713cc45f3e6b1b76f9860bf4bf0fea Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 18:53:52 +0530 Subject: [PATCH 14/16] fixup! use symbols instead of kState --- lib/internal/streams/end-of-stream.js | 4 ++-- lib/internal/streams/utils.js | 4 ++++ lib/internal/webstreams/readablestream.js | 14 ++++++++------ lib/internal/webstreams/writablestream.js | 15 ++++++++++----- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 0aadee16d11dc9..165d296ec1da92 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -36,11 +36,11 @@ const { isWritableErrored, isNodeStream, willEmitClose: _willEmitClose, + kIsClosed, } = require('internal/streams/utils'); const { isBrandCheck, - kState, } = require('internal/webstreams/util'); const isReadableStream = @@ -270,7 +270,7 @@ function eos(stream, options, callback) { function eosWeb(stream, opts, callback) { PromisePrototypeThen( - stream[kState].closed.promise, + stream[kIsClosed].promise, () => process.nextTick(() => callback.call(stream)), (err) => process.nextTick(() => callback.call(stream, err)), ); diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 4d4f00ab456fa7..47f23e6a71e881 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -4,6 +4,7 @@ const { Symbol, SymbolAsyncIterator, SymbolIterator, + SymbolFor, } = primordials; const kDestroyed = Symbol('kDestroyed'); @@ -11,6 +12,8 @@ const kIsErrored = Symbol('kIsErrored'); const kIsReadable = Symbol('kIsReadable'); const kIsDisturbed = Symbol('kIsDisturbed'); +const kIsClosed = SymbolFor('nodejs.webstream.closed'); + function isReadableNodeStream(obj, strict = false) { return !!( obj && @@ -269,6 +272,7 @@ module.exports = { kIsErrored, isReadable, kIsReadable, + kIsClosed, isClosed, isDestroyed, isDuplexNodeStream, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 44e55b9c184e51..00bbaefd1f5862 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -84,6 +84,7 @@ const { kIsDisturbed, kIsErrored, kIsReadable, + kIsClosed, } = require('internal/streams/utils'); const { @@ -232,9 +233,10 @@ class ReadableStream { port2: undefined, promise: undefined, }, - closed: createDeferredPromise(), }; + this[kIsClosed] = createDeferredPromise(); + // The spec requires handling of the strategy first // here. Specifically, if getting the size and // highWaterMark from the strategy fail, that has @@ -627,8 +629,8 @@ function TransferredReadableStream() { port: undefined, promise: undefined, }, - closed: createDeferredPromise(), }; + this[kIsClosed] = createDeferredPromise(); }, [], ReadableStream)); } @@ -1198,8 +1200,8 @@ function createTeeReadableStream(start, pull, cancel) { port: undefined, promise: undefined, }, - closed: createDeferredPromise(), }; + this[kIsClosed] = createDeferredPromise(); setupReadableStreamDefaultControllerFromSource( this, ObjectCreate(null, { @@ -1872,7 +1874,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; - stream[kState].closed.resolve(); + stream[kIsClosed].resolve(); const { reader, @@ -1894,8 +1896,8 @@ function readableStreamError(stream, error) { assert(stream[kState].state === 'readable'); stream[kState].state = 'errored'; stream[kState].storedError = error; - stream[kState].closed.reject(error); - setPromiseHandled(stream[kState].closed.promise); + stream[kIsClosed].reject(error); + setPromiseHandled(stream[kIsClosed].promise); const { reader diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 31d9702d991180..811684bbb14383 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -67,6 +67,10 @@ const { kState, } = require('internal/webstreams/util'); +const { + kIsClosed, +} = require('internal/streams/utils'); + const { AbortController, } = require('internal/abort_controller'); @@ -176,9 +180,10 @@ class WritableStream { port2: undefined, promise: undefined, }, - closed: createDeferredPromise(), }; + this[kIsClosed] = createDeferredPromise(); + const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -347,8 +352,8 @@ function TransferredWritableStream() { port2: undefined, readable: undefined, }, - closed: createDeferredPromise() }; + this[kIsClosed] = createDeferredPromise(); }, [], WritableStream)); } @@ -729,8 +734,8 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { }; } - stream[kState].closed.reject(stream[kState]?.storedError); - setPromiseHandled(stream[kState].closed.promise); + stream[kIsClosed].reject(stream[kState]?.storedError); + setPromiseHandled(stream[kIsClosed].promise); const { writer, @@ -845,7 +850,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); - stream[kState].closed.resolve?.(); + stream[kIsClosed].resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } From 4ab3a7b0bbd778f3db82bdcc952598017c5f2c1e Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 19:24:41 +0530 Subject: [PATCH 15/16] fixup! remove brandchecks --- lib/internal/streams/end-of-stream.js | 11 ++--------- lib/internal/streams/utils.js | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 165d296ec1da92..533dc23d8dccdb 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -28,10 +28,12 @@ const { isClosed, isReadable, isReadableNodeStream, + isReadableStream, isReadableFinished, isReadableErrored, isWritable, isWritableNodeStream, + isWritableStream, isWritableFinished, isWritableErrored, isNodeStream, @@ -39,15 +41,6 @@ const { kIsClosed, } = require('internal/streams/utils'); -const { - isBrandCheck, -} = require('internal/webstreams/util'); - -const isReadableStream = - isBrandCheck('ReadableStream'); -const isWritableStream = - isBrandCheck('WritableStream'); - function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 47f23e6a71e881..2bf38d89b2d552 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -58,6 +58,25 @@ function isNodeStream(obj) { ); } +function isReadableStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.pipeThrough === 'function' && + typeof obj.getReader === 'function' && + typeof obj.cancel === 'function' + ); +} + +function isWritableStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.getWriter === 'function' && + typeof obj.abort === 'function' + ); +} + function isIterable(obj, isAsync) { if (obj == null) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -279,12 +298,14 @@ module.exports = { isFinished, isIterable, isReadableNodeStream, + isReadableStream, isReadableEnded, isReadableFinished, isReadableErrored, isNodeStream, isWritable, isWritableNodeStream, + isWritableStream, isWritableEnded, isWritableFinished, isWritableErrored, From f1ef9e46dbc0bc5220709a7446c0dbf76936ba76 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 15 Jan 2023 20:03:14 +0530 Subject: [PATCH 16/16] fixup! rename kIsClosed --- lib/internal/streams/end-of-stream.js | 4 ++-- lib/internal/streams/utils.js | 4 ++-- lib/internal/webstreams/readablestream.js | 14 +++++++------- lib/internal/webstreams/writablestream.js | 12 ++++++------ 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 533dc23d8dccdb..07f80aedc69cd5 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -38,7 +38,7 @@ const { isWritableErrored, isNodeStream, willEmitClose: _willEmitClose, - kIsClosed, + kIsClosedPromise, } = require('internal/streams/utils'); function isRequest(stream) { @@ -263,7 +263,7 @@ function eos(stream, options, callback) { function eosWeb(stream, opts, callback) { PromisePrototypeThen( - stream[kIsClosed].promise, + stream[kIsClosedPromise].promise, () => process.nextTick(() => callback.call(stream)), (err) => process.nextTick(() => callback.call(stream, err)), ); diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 2bf38d89b2d552..9d08af6f31a280 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -12,7 +12,7 @@ const kIsErrored = Symbol('kIsErrored'); const kIsReadable = Symbol('kIsReadable'); const kIsDisturbed = Symbol('kIsDisturbed'); -const kIsClosed = SymbolFor('nodejs.webstream.closed'); +const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); function isReadableNodeStream(obj, strict = false) { return !!( @@ -291,7 +291,7 @@ module.exports = { kIsErrored, isReadable, kIsReadable, - kIsClosed, + kIsClosedPromise, isClosed, isDestroyed, isDuplexNodeStream, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 00bbaefd1f5862..a5d7b3d49dc594 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -84,7 +84,7 @@ const { kIsDisturbed, kIsErrored, kIsReadable, - kIsClosed, + kIsClosedPromise, } = require('internal/streams/utils'); const { @@ -235,7 +235,7 @@ class ReadableStream { }, }; - this[kIsClosed] = createDeferredPromise(); + this[kIsClosedPromise] = createDeferredPromise(); // The spec requires handling of the strategy first // here. Specifically, if getting the size and @@ -630,7 +630,7 @@ function TransferredReadableStream() { promise: undefined, }, }; - this[kIsClosed] = createDeferredPromise(); + this[kIsClosedPromise] = createDeferredPromise(); }, [], ReadableStream)); } @@ -1201,7 +1201,7 @@ function createTeeReadableStream(start, pull, cancel) { promise: undefined, }, }; - this[kIsClosed] = createDeferredPromise(); + this[kIsClosedPromise] = createDeferredPromise(); setupReadableStreamDefaultControllerFromSource( this, ObjectCreate(null, { @@ -1874,7 +1874,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; - stream[kIsClosed].resolve(); + stream[kIsClosedPromise].resolve(); const { reader, @@ -1896,8 +1896,8 @@ function readableStreamError(stream, error) { assert(stream[kState].state === 'readable'); stream[kState].state = 'errored'; stream[kState].storedError = error; - stream[kIsClosed].reject(error); - setPromiseHandled(stream[kIsClosed].promise); + stream[kIsClosedPromise].reject(error); + setPromiseHandled(stream[kIsClosedPromise].promise); const { reader diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 811684bbb14383..f8c5038b025abf 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -68,7 +68,7 @@ const { } = require('internal/webstreams/util'); const { - kIsClosed, + kIsClosedPromise, } = require('internal/streams/utils'); const { @@ -182,7 +182,7 @@ class WritableStream { }, }; - this[kIsClosed] = createDeferredPromise(); + this[kIsClosedPromise] = createDeferredPromise(); const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -353,7 +353,7 @@ function TransferredWritableStream() { readable: undefined, }, }; - this[kIsClosed] = createDeferredPromise(); + this[kIsClosedPromise] = createDeferredPromise(); }, [], WritableStream)); } @@ -734,8 +734,8 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { }; } - stream[kIsClosed].reject(stream[kState]?.storedError); - setPromiseHandled(stream[kIsClosed].promise); + stream[kIsClosedPromise].reject(stream[kState]?.storedError); + setPromiseHandled(stream[kIsClosedPromise].promise); const { writer, @@ -850,7 +850,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); - stream[kIsClosed].resolve?.(); + stream[kIsClosedPromise].resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); }