Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fetch): accept async iterables for body #24623

Merged
merged 20 commits into from
Aug 6, 2024
13 changes: 13 additions & 0 deletions ext/fetch/22_body.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ function extractBody(object) {
if (object.locked || isReadableStreamDisturbed(object)) {
throw new TypeError("ReadableStream is locked or disturbed");
}
} else if (object[webidl.AsyncIterable] === webidl.AsyncIterable) {
stream = ReadableStream.from(object.open());
}
if (typeof source === "string") {
// WARNING: this deviates from spec (expects length to be set)
Expand All @@ -475,6 +477,9 @@ function extractBody(object) {
return { body, contentType };
}

webidl.converters["async iterable<Uint8Array>"] = webidl
.createAsyncIterableConverter(webidl.converters.Uint8Array);

webidl.converters["BodyInit_DOMString"] = (V, prefix, context, opts) => {
// Union for (ReadableStream or Blob or ArrayBufferView or ArrayBuffer or FormData or URLSearchParams or USVString)
if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, V)) {
Expand All @@ -493,6 +498,14 @@ webidl.converters["BodyInit_DOMString"] = (V, prefix, context, opts) => {
if (ArrayBufferIsView(V)) {
return webidl.converters["ArrayBufferView"](V, prefix, context, opts);
}
if (webidl.isAsyncIterator(V)) {
return webidl.converters["async iterable<Uint8Array>"](
V,
prefix,
context,
opts,
);
}
}
// BodyInit conversion is passed to extractBody(), which calls core.encode().
// core.encode() will UTF-8 encode strings with replacement, being equivalent to the USV normalization.
Expand Down
1 change: 1 addition & 0 deletions ext/fetch/lib.deno_fetch.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ declare type BodyInit =
| FormData
| URLSearchParams
| ReadableStream<Uint8Array>
| AsyncIterable<Uint8Array>
| string;
/** @category Fetch */
declare type RequestDestination =
Expand Down
63 changes: 15 additions & 48 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const {
String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
TypeError,
TypedArrayPrototypeGetBuffer,
Expand Down Expand Up @@ -5083,34 +5082,6 @@ function initializeCountSizeFunction(globalObject) {
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}

// Ref: https://tc39.es/ecma262/#sec-getiterator
function getAsyncOrSyncIterator(obj) {
let iterator;
if (obj[SymbolAsyncIterator] != null) {
iterator = obj[SymbolAsyncIterator]();
if (!isObject(iterator)) {
throw new TypeError(
"[Symbol.asyncIterator] returned a non-object value",
);
}
} else if (obj[SymbolIterator] != null) {
iterator = obj[SymbolIterator]();
if (!isObject(iterator)) {
throw new TypeError("[Symbol.iterator] returned a non-object value");
}
} else {
throw new TypeError("No iterator found");
}
if (typeof iterator.next !== "function") {
throw new TypeError("iterator.next is not a function");
}
return iterator;
}

function isObject(x) {
return (typeof x === "object" && x != null) || typeof x === "function";
}

const _resourceBacking = Symbol("[[resourceBacking]]");
// This distinction exists to prevent unrefable streams being used in
// regular fast streams that are unaware of refability
Expand Down Expand Up @@ -5196,21 +5167,22 @@ class ReadableStream {
}

static from(asyncIterable) {
const prefix = "Failed to execute 'ReadableStream.from'";
webidl.requiredArguments(
arguments.length,
1,
"Failed to execute 'ReadableStream.from'",
prefix,
);
asyncIterable = webidl.converters.any(asyncIterable);

const iterator = getAsyncOrSyncIterator(asyncIterable);
asyncIterable = webidl.converters["async iterable<any>"](
asyncIterable,
prefix,
"Argument 1",
);
const iter = asyncIterable.open();

const stream = createReadableStream(noop, async () => {
// deno-lint-ignore prefer-primordials
const res = await iterator.next();
if (!isObject(res)) {
throw new TypeError("iterator.next value is not an object");
}
const res = await iter.next();
if (res.done) {
readableStreamDefaultControllerClose(stream[_controller]);
} else {
Expand All @@ -5220,17 +5192,8 @@ class ReadableStream {
);
}
}, async (reason) => {
if (iterator.return == null) {
return undefined;
} else {
// deno-lint-ignore prefer-primordials
const res = await iterator.return(reason);
if (!isObject(res)) {
throw new TypeError("iterator.return value is not an object");
} else {
return undefined;
}
}
// deno-lint-ignore prefer-primordials
await iter.return(reason);
}, 0);
return stream;
}
Expand Down Expand Up @@ -6890,6 +6853,10 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);

webidl.converters["async iterable<any>"] = webidl.createAsyncIterableConverter(
webidl.converters.any,
);

internals.resourceForReadableStream = resourceForReadableStream;

export {
Expand Down
126 changes: 126 additions & 0 deletions ext/webidl/00_webidl.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
Float32Array,
Float64Array,
FunctionPrototypeBind,
FunctionPrototypeCall,
Int16Array,
Int32Array,
Int8Array,
Expand Down Expand Up @@ -77,6 +78,7 @@ const {
StringPrototypeToWellFormed,
Symbol,
SymbolIterator,
SymbolAsyncIterator,
SymbolToStringTag,
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetSymbolToStringTag,
Expand Down Expand Up @@ -919,6 +921,127 @@ function createSequenceConverter(converter) {
};
}

function isAsyncIterator(obj) {
if (obj[SymbolAsyncIterator] === undefined) {
if (obj[SymbolIterator] === undefined) {
return false;
}
}

return true;
}

const AsyncIterable = Symbol("[[asyncIterable]]");

function createAsyncIterableConverter(converter) {
return function (
V,
prefix = undefined,
context = undefined,
opts = { __proto__: null },
) {
if (type(V) !== "Object") {
throw makeException(
TypeError,
"can not be converted to async iterable.",
prefix,
context,
);
}

let isAsync = true;
let method = V[SymbolAsyncIterator];
if (method === undefined) {
method = V[SymbolIterator];

if (method === undefined) {
throw makeException(
TypeError,
"is not iterable.",
prefix,
context,
);
}

isAsync = false;
}

return {
value: V,
[AsyncIterable]: AsyncIterable,
open(context) {
const iter = FunctionPrototypeCall(method, V);
if (type(iter) !== "Object") {
throw new TypeError(
`${context} could not be iterated because iterator method did not return object, but ${
type(iter)
}.`,
);
}

let asyncIterator = iter;

if (!isAsync) {
asyncIterator = {
// deno-lint-ignore require-await
async next() {
// deno-lint-ignore prefer-primordials
return iter.next();
},
};
}

return {
async next() {
// deno-lint-ignore prefer-primordials
const iterResult = await asyncIterator.next();
if (type(iterResult) !== "Object") {
throw TypeError(
`${context} failed to iterate next value because the next() method did not return an object, but ${
type(iterResult)
}.`,
);
}

if (iterResult.done) {
return { done: true };
}

const iterValue = converter(
iterResult.value,
`${context} failed to iterate next value`,
`The value returned from the next() method`,
opts,
);

return { done: false, value: iterValue };
},
async return(reason) {
if (asyncIterator.return === undefined) {
return undefined;
}

// deno-lint-ignore prefer-primordials
const returnPromiseResult = await asyncIterator.return(reason);
if (type(returnPromiseResult) !== "Object") {
throw TypeError(
`${context} failed to close iterator because the return() method did not return an object, but ${
type(returnPromiseResult)
}.`,
);
}

return undefined;
},
[SymbolAsyncIterator]() {
return this;
},
};
},
};
};
}

function createRecordConverter(keyConverter, valueConverter) {
return (V, prefix, context, opts) => {
if (type(V) !== "Object") {
Expand Down Expand Up @@ -1287,9 +1410,11 @@ function setlike(obj, objPrototype, readonly) {

export {
assertBranded,
AsyncIterable,
brand,
configureInterface,
converters,
createAsyncIterableConverter,
createBranded,
createDictionaryConverter,
createEnumConverter,
Expand All @@ -1300,6 +1425,7 @@ export {
createSequenceConverter,
illegalConstructor,
invokeCallbackFunction,
isAsyncIterator,
makeException,
mixinPairIterable,
requiredArguments,
Expand Down
21 changes: 21 additions & 0 deletions ext/webidl/internal.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,27 @@ declare module "ext:deno_webidl/00_webidl.js" {
opts?: any,
) => T[];

/**
* Create a converter that converts an async iterable of the inner type.
*/
function createAsyncIterableConverter<V, T>(
converter: (
v: V,
prefix?: string,
context?: string,
opts?: any,
) => T,
): (
v: any,
prefix?: string,
context?: string,
opts?: any,
) => ConvertedAsyncIterable<V, T>;

interface ConvertedAsyncIterable<V, T> extends AsyncIterableIterator<T> {
value: V;
}

/**
* Create a converter that converts a Promise of the inner type.
*/
Expand Down
1 change: 1 addition & 0 deletions tests/integration/node_unit_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ util::unit_test_factory!(
dgram_test,
domain_test,
fs_test,
fetch_test,
http_test,
http2_test,
_randomBytes_test = internal / _randomBytes_test,
Expand Down
15 changes: 14 additions & 1 deletion tests/unit/streams_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { assertEquals, assertRejects, fail } from "./test_util.ts";
import {
assertEquals,
assertRejects,
assertThrows,
fail,
} from "./test_util.ts";

const {
core,
Expand Down Expand Up @@ -533,3 +538,11 @@ Deno.test(async function decompressionStreamInvalidGzipStillReported() {
"corrupt gzip stream does not have a matching checksum",
);
});

Deno.test(function readableStreamFromWithStringThrows() {
assertThrows(
() => ReadableStream.from("string"),
TypeError,
"Failed to execute 'ReadableStream.from': Argument 1 can not be converted to async iterable.",
);
});
18 changes: 18 additions & 0 deletions tests/unit_node/fetch_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { assertEquals } from "@std/assert";
import { createReadStream } from "node:fs";

Deno.test("fetch node stream", async () => {
const file = createReadStream("tests/testdata/assets/fixture.json");

const response = await fetch("http://localhost:4545/echo_server", {
method: "POST",
body: file,
});

assertEquals(
await response.text(),
await Deno.readTextFile("tests/testdata/assets/fixture.json"),
);
});
Loading