diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 2c7ec0f47a5648..04360116b207df 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -354,7 +354,6 @@ resolveDns, }; window.__bootstrap.streamUtils = { - readableStreamForRid, writableStreamForRid, }; })(this); diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index bd3b79149b7329..cbf781b53fd1d9 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -648,29 +648,76 @@ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB /** - * @callback unrefCallback - * @param {Promise} promise - * @returns {undefined} + * Create a new ReadableStream object that is backed by a Resource that + * implements `Resource::read_return`. This object contains enough metadata to + * allow callers to bypass the JavaScript ReadableStream implementation and + * read directly from the underlying resource if they so choose (FastStream). + * + * @param {number} rid The resource ID to read from. + * @returns {ReadableStream} */ + function readableStreamForRid(rid) { + const stream = webidl.createBranded(ReadableStream); + stream[_maybeRid] = rid; + const underlyingSource = { + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await core.read(rid, v); + if (bytesRead === 0) { + core.tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + core.tryClose(rid); + } + }, + cancel() { + core.tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }; + initializeReadableStream(stream); + setUpReadableByteStreamControllerFromUnderlyingSource( + stream, + underlyingSource, + underlyingSource, + 0, + ); + return stream; + } + + const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); + const _isUnref = Symbol("isUnref"); /** - * @param {number} rid - * @param {unrefCallback=} unrefCallback + * Create a new ReadableStream object that is backed by a Resource that + * implements `Resource::read_return`. This readable stream supports being + * refed and unrefed by calling `readableStreamForRidUnrefableRef` and + * `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not + * FastStream compatible. + * + * @param {number} rid The resource ID to read from. * @returns {ReadableStream} */ - function readableStreamForRid(rid, unrefCallback) { + function readableStreamForRidUnrefable(rid) { const stream = webidl.createBranded(ReadableStream); - stream[_maybeRid] = rid; + stream[promiseIdSymbol] = undefined; + stream[_isUnref] = false; const underlyingSource = { type: "bytes", async pull(controller) { const v = controller.byobRequest.view; try { const promise = core.read(rid, v); - - unrefCallback?.(promise); - + const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol]; + if (stream[_isUnref]) core.unrefOp(promiseId); const bytesRead = await promise; - + stream[promiseIdSymbol] = undefined; if (bytesRead === 0) { core.tryClose(rid); controller.close(); @@ -695,10 +742,25 @@ underlyingSource, 0, ); - return stream; } + function readableStreamForRidUnrefableRef(stream) { + if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream"); + stream[_isUnref] = false; + if (stream[promiseIdSymbol] !== undefined) { + core.refOp(stream[promiseIdSymbol]); + } + } + + function readableStreamForRidUnrefableUnref(stream) { + if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream"); + stream[_isUnref] = true; + if (stream[promiseIdSymbol] !== undefined) { + core.unrefOp(stream[promiseIdSymbol]); + } + } + function getReadableStreamRid(stream) { return stream[_maybeRid]; } @@ -5921,6 +5983,9 @@ readableStreamClose, readableStreamDisturb, readableStreamForRid, + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, getReadableStreamRid, Deferred, // Exposed in global runtime scope diff --git a/runtime/js/40_spawn.js b/runtime/js/40_spawn.js index daa4f8ff880dd5..99661bf1a9a7b9 100644 --- a/runtime/js/40_spawn.js +++ b/runtime/js/40_spawn.js @@ -16,8 +16,12 @@ PromiseAll, SymbolFor, } = window.__bootstrap.primordials; - const { readableStreamForRid, writableStreamForRid } = - window.__bootstrap.streamUtils; + const { + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, + } = window.__bootstrap.streams; + const { writableStreamForRid } = window.__bootstrap.streamUtils; const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); @@ -136,18 +140,12 @@ if (stdoutRid !== null) { this.#stdoutRid = stdoutRid; - this.#stdout = readableStreamForRid(stdoutRid, (promise) => { - this.#stdoutPromiseId = promise[promiseIdSymbol]; - if (this.#unrefed) core.unrefOp(this.#stdoutPromiseId); - }); + this.#stdout = readableStreamForRidUnrefable(stdoutRid); } if (stderrRid !== null) { this.#stderrRid = stderrRid; - this.#stderr = readableStreamForRid(stderrRid, (promise) => { - this.#stderrPromiseId = promise[promiseIdSymbol]; - if (this.#unrefed) core.unrefOp(this.#stderrPromiseId); - }); + this.#stderr = readableStreamForRidUnrefable(stderrRid); } const onAbort = () => this.kill("SIGTERM"); @@ -214,15 +212,15 @@ ref() { this.#unrefed = false; core.refOp(this.#waitPromiseId); - if (this.#stdoutPromiseId) core.refOp(this.#stdoutPromiseId); - if (this.#stderrPromiseId) core.refOp(this.#stderrPromiseId); + if (this.#stdout) readableStreamForRidUnrefableRef(this.#stdout); + if (this.#stderr) readableStreamForRidUnrefableRef(this.#stderr); } unref() { this.#unrefed = true; core.unrefOp(this.#waitPromiseId); - if (this.#stdoutPromiseId) core.unrefOp(this.#stdoutPromiseId); - if (this.#stderrPromiseId) core.unrefOp(this.#stderrPromiseId); + if (this.#stdout) readableStreamForRidUnrefableUnref(this.#stdout); + if (this.#stderr) readableStreamForRidUnrefableUnref(this.#stderr); } }