Skip to content

Commit

Permalink
fix(runtime): no FastStream for unrefable streams (#16095)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucacasonato authored Sep 29, 2022
1 parent 927f4e2 commit 38f5445
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 27 deletions.
1 change: 0 additions & 1 deletion ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@
resolveDns,
};
window.__bootstrap.streamUtils = {
readableStreamForRid,
writableStreamForRid,
};
})(this);
89 changes: 77 additions & 12 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -648,29 +648,76 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB

/**
* @callback unrefCallback
* @param {Promise} promise
* @returns {undefined}
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This object contains enough metadata to
* allow callers to bypass the JavaScript ReadableStream implementation and
* read directly from the underlying resource if they so choose (FastStream).
*
* @param {number} rid The resource ID to read from.
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRid(rid) {
const stream = webidl.createBranded(ReadableStream);
stream[_maybeRid] = rid;
const underlyingSource = {
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
controller.byobRequest.respond(0);
} else {
controller.byobRequest.respond(bytesRead);
}
} catch (e) {
controller.error(e);
core.tryClose(rid);
}
},
cancel() {
core.tryClose(rid);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
};
initializeReadableStream(stream);
setUpReadableByteStreamControllerFromUnderlyingSource(
stream,
underlyingSource,
underlyingSource,
0,
);
return stream;
}

const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
const _isUnref = Symbol("isUnref");
/**
* @param {number} rid
* @param {unrefCallback=} unrefCallback
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This readable stream supports being
* refed and unrefed by calling `readableStreamForRidUnrefableRef` and
* `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not
* FastStream compatible.
*
* @param {number} rid The resource ID to read from.
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRid(rid, unrefCallback) {
function readableStreamForRidUnrefable(rid) {
const stream = webidl.createBranded(ReadableStream);
stream[_maybeRid] = rid;
stream[promiseIdSymbol] = undefined;
stream[_isUnref] = false;
const underlyingSource = {
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const promise = core.read(rid, v);

unrefCallback?.(promise);

const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol];
if (stream[_isUnref]) core.unrefOp(promiseId);
const bytesRead = await promise;

stream[promiseIdSymbol] = undefined;
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
Expand All @@ -695,10 +742,25 @@
underlyingSource,
0,
);

return stream;
}

function readableStreamForRidUnrefableRef(stream) {
if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
stream[_isUnref] = false;
if (stream[promiseIdSymbol] !== undefined) {
core.refOp(stream[promiseIdSymbol]);
}
}

function readableStreamForRidUnrefableUnref(stream) {
if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
stream[_isUnref] = true;
if (stream[promiseIdSymbol] !== undefined) {
core.unrefOp(stream[promiseIdSymbol]);
}
}

function getReadableStreamRid(stream) {
return stream[_maybeRid];
}
Expand Down Expand Up @@ -5921,6 +5983,9 @@
readableStreamClose,
readableStreamDisturb,
readableStreamForRid,
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
getReadableStreamRid,
Deferred,
// Exposed in global runtime scope
Expand Down
26 changes: 12 additions & 14 deletions runtime/js/40_spawn.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
PromiseAll,
SymbolFor,
} = window.__bootstrap.primordials;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streamUtils;
const {
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
} = window.__bootstrap.streams;
const { writableStreamForRid } = window.__bootstrap.streamUtils;

const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");

Expand Down Expand Up @@ -136,18 +140,12 @@

if (stdoutRid !== null) {
this.#stdoutRid = stdoutRid;
this.#stdout = readableStreamForRid(stdoutRid, (promise) => {
this.#stdoutPromiseId = promise[promiseIdSymbol];
if (this.#unrefed) core.unrefOp(this.#stdoutPromiseId);
});
this.#stdout = readableStreamForRidUnrefable(stdoutRid);
}

if (stderrRid !== null) {
this.#stderrRid = stderrRid;
this.#stderr = readableStreamForRid(stderrRid, (promise) => {
this.#stderrPromiseId = promise[promiseIdSymbol];
if (this.#unrefed) core.unrefOp(this.#stderrPromiseId);
});
this.#stderr = readableStreamForRidUnrefable(stderrRid);
}

const onAbort = () => this.kill("SIGTERM");
Expand Down Expand Up @@ -214,15 +212,15 @@
ref() {
this.#unrefed = false;
core.refOp(this.#waitPromiseId);
if (this.#stdoutPromiseId) core.refOp(this.#stdoutPromiseId);
if (this.#stderrPromiseId) core.refOp(this.#stderrPromiseId);
if (this.#stdout) readableStreamForRidUnrefableRef(this.#stdout);
if (this.#stderr) readableStreamForRidUnrefableRef(this.#stderr);
}

unref() {
this.#unrefed = true;
core.unrefOp(this.#waitPromiseId);
if (this.#stdoutPromiseId) core.unrefOp(this.#stdoutPromiseId);
if (this.#stderrPromiseId) core.unrefOp(this.#stderrPromiseId);
if (this.#stdout) readableStreamForRidUnrefableUnref(this.#stdout);
if (this.#stderr) readableStreamForRidUnrefableUnref(this.#stderr);
}
}

Expand Down

0 comments on commit 38f5445

Please sign in to comment.