From cc80a2d1908058a6e5e3f4e846b52c44d8d1fa61 Mon Sep 17 00:00:00 2001
From: Bram Kamies <146706532+passionate-bram@users.noreply.github.com>
Date: Sat, 24 Feb 2024 22:53:36 +0100
Subject: [PATCH] feat(response): add `sendIterable` util (#655)
---
docs/2.utils/2.reponse.md | 29 ++++++
src/utils/internal/iteratable.ts | 64 +++++++++++++
src/utils/response.ts | 70 ++++++++++++++
test/iteratable.test.ts | 153 +++++++++++++++++++++++++++++++
test/utils.test.ts | 137 ++++++++++++++++++++++++++-
5 files changed, 452 insertions(+), 1 deletion(-)
create mode 100644 src/utils/internal/iteratable.ts
create mode 100644 test/iteratable.test.ts
diff --git a/docs/2.utils/2.reponse.md b/docs/2.utils/2.reponse.md
index a1b2acb9..8342433b 100644
--- a/docs/2.utils/2.reponse.md
+++ b/docs/2.utils/2.reponse.md
@@ -65,6 +65,35 @@ Remove a response header by name.
Directly send a response to the client.
**Note:** This function should be used only when you want to send a response directly without using the `h3` event. Normaly you can directly `return` a value inside event handlers.
+### `sendIterable(event, iterable)`
+
+Iterate a source of chunks and send back each chunk in order. Supports mixing async work toghether with emitting chunks.
+Each chunk must be a string or a buffer.
+For generator (yielding) functions, the returned value is treated the same as yielded values.
+
+**Example:**
+
+```ts
+sendIterable(event, work());
+async function* work() {
+ // Open document body
+ yield "\n
Executing...
\n";
+ // Do work ...
+ for (let i = 0; i < 1000) {
+ await delay(1000);
+ // Report progress
+ yield `- Completed job #`;
+ yield i;
+ yield `
\n`;
+ }
+ // Close out the report
+ return `
`;
+}
+async function delay(ms) {
+ return new Promise(resolve => setTimeout(resolve, ms));
+}
+```
+
### `sendNoContent(event, code?)`
Respond with an empty payload.
diff --git a/src/utils/internal/iteratable.ts b/src/utils/internal/iteratable.ts
new file mode 100644
index 00000000..68f9aa2b
--- /dev/null
+++ b/src/utils/internal/iteratable.ts
@@ -0,0 +1,64 @@
+export type IterationSource =
+ | Iterable
+ | AsyncIterable
+ | Iterator
+ | AsyncIterator
+ | (() =>
+ | Iterator
+ | AsyncIterator);
+
+type SendableValue = string | Buffer | Uint8Array;
+export type IteratorSerializer = (
+ value: Value,
+) => SendableValue | undefined;
+
+/**
+ * The default implementation for {@link sendIterable}'s `serializer` argument.
+ * It serializes values as follows:
+ * - Instances of {@link String}, {@link Uint8Array} and `undefined` are returned as-is.
+ * - Objects are serialized through {@link JSON.stringify}.
+ * - Functions are serialized as `undefined`.
+ * - Values of type boolean, number, bigint or symbol are serialized using their `toString` function.
+ *
+ * @param value - The value to serialize to either a string or Uint8Array.
+ */
+export function serializeIterableValue(
+ value: unknown,
+): SendableValue | undefined {
+ switch (typeof value) {
+ case "string": {
+ return value;
+ }
+ case "boolean":
+ case "number":
+ case "bigint":
+ case "symbol": {
+ return value.toString();
+ }
+ case "function":
+ case "undefined": {
+ return undefined;
+ }
+ case "object": {
+ if (value instanceof Uint8Array) {
+ return value;
+ }
+ return JSON.stringify(value);
+ }
+ }
+}
+
+export function coerceIterable(
+ iterable: IterationSource,
+): Iterator | AsyncIterator {
+ if (typeof iterable === "function") {
+ iterable = iterable();
+ }
+ if (Symbol.iterator in iterable) {
+ return iterable[Symbol.iterator]();
+ }
+ if (Symbol.asyncIterator in iterable) {
+ return iterable[Symbol.asyncIterator]();
+ }
+ return iterable;
+}
diff --git a/src/utils/response.ts b/src/utils/response.ts
index 48c27fe5..8ee1fa19 100644
--- a/src/utils/response.ts
+++ b/src/utils/response.ts
@@ -7,6 +7,12 @@ import { MIMES } from "./consts";
import { sanitizeStatusCode, sanitizeStatusMessage } from "./sanitize";
import { splitCookiesString } from "./cookie";
import { hasProp } from "./internal/object";
+import {
+ serializeIterableValue,
+ coerceIterable,
+ IterationSource,
+ IteratorSerializer,
+} from "./internal/iteratable";
const defer =
typeof setImmediate === "undefined" ? (fn: () => any) => fn() : setImmediate;
@@ -451,3 +457,67 @@ export function sendWebResponse(
}
return sendStream(event, response.body);
}
+
+/**
+ * Iterate a source of chunks and send back each chunk in order.
+ * Supports mixing async work toghether with emitting chunks.
+ *
+ * Each chunk must be a string or a buffer.
+ *
+ * For generator (yielding) functions, the returned value is treated the same as yielded values.
+ *
+ * @param event - H3 event
+ * @param iterable - Iterator that produces chunks of the response.
+ * @param serializer - Function that converts values from the iterable into stream-compatible values.
+ * @template Value - Test
+ *
+ * @example
+ * sendIterable(event, work());
+ * async function* work() {
+ * // Open document body
+ * yield "\nExecuting...
\n";
+ * // Do work ...
+ * for (let i = 0; i < 1000) {
+ * await delay(1000);
+ * // Report progress
+ * yield `- Completed job #`;
+ * yield i;
+ * yield `
\n`;
+ * }
+ * // Close out the report
+ * return `
`;
+ * }
+ * async function delay(ms) {
+ * return new Promise(resolve => setTimeout(resolve, ms));
+ * }
+ */
+export function sendIterable(
+ event: H3Event,
+ iterable: IterationSource,
+ options?: {
+ serializer: IteratorSerializer;
+ },
+): Promise {
+ const serializer = options?.serializer ?? serializeIterableValue;
+ const iterator = coerceIterable(iterable);
+ return sendStream(
+ event,
+ new ReadableStream({
+ async pull(controller) {
+ const { value, done } = await iterator.next();
+ if (value !== undefined) {
+ const chunk = serializer(value);
+ if (chunk !== undefined) {
+ controller.enqueue(chunk);
+ }
+ }
+ if (done) {
+ controller.close();
+ }
+ },
+ cancel() {
+ iterator.return?.();
+ },
+ }),
+ );
+}
diff --git a/test/iteratable.test.ts b/test/iteratable.test.ts
new file mode 100644
index 00000000..43bc4077
--- /dev/null
+++ b/test/iteratable.test.ts
@@ -0,0 +1,153 @@
+import { ReadableStream } from "node:stream/web";
+import supertest, { SuperTest, Test } from "supertest";
+import { describe, it, expect, beforeEach, vi } from "vitest";
+import {
+ createApp,
+ App,
+ toNodeListener,
+ eventHandler,
+ sendIterable,
+} from "../src";
+import { serializeIterableValue } from "../src/utils/internal/iteratable";
+
+describe("iteratable", () => {
+ let app: App;
+ let request: SuperTest;
+
+ beforeEach(() => {
+ app = createApp({ debug: false });
+ request = supertest(toNodeListener(app));
+ });
+
+ describe("serializeIterableValue", () => {
+ const exampleDate: Date = new Date(Date.UTC(2015, 6, 21, 3, 24, 54, 888));
+ it.each([
+ { value: "Hello, world!", output: "Hello, world!" },
+ { value: 123, output: "123" },
+ { value: 1n, output: "1" },
+ { value: true, output: "true" },
+ { value: false, output: "false" },
+ { value: undefined, output: undefined },
+ { value: null, output: "null" },
+ { value: exampleDate, output: JSON.stringify(exampleDate) },
+ { value: { field: 1 }, output: '{"field":1}' },
+ { value: [1, 2, 3], output: "[1,2,3]" },
+ { value: () => {}, output: undefined },
+ {
+ value: Buffer.from("Hello, world!"),
+ output: Buffer.from("Hello, world!"),
+ },
+ { value: Uint8Array.from([1, 2, 3]), output: Uint8Array.from([1, 2, 3]) },
+ ])("$value => $output", ({ value, output }) => {
+ const serialized = serializeIterableValue(value);
+ expect(serialized).toStrictEqual(output);
+ });
+ });
+
+ describe("sendIterable", () => {
+ it("sends empty body for an empty iterator", async () => {
+ app.use(eventHandler((event) => sendIterable(event, [])));
+ const result = await request.get("/");
+ expect(result.header["content-length"]).toBe("0");
+ expect(result.text).toBe("");
+ });
+
+ it("concatenates iterated values", async () => {
+ app.use(eventHandler((event) => sendIterable(event, ["a", "b", "c"])));
+ const result = await request.get("/");
+ expect(result.text).toBe("abc");
+ });
+
+ describe("iterable support", () => {
+ it.each([
+ { type: "Array", iterable: ["the-value"] },
+ { type: "Set", iterable: new Set(["the-value"]) },
+ {
+ type: "Map.keys()",
+ iterable: new Map([["the-value", "unused"]]).keys(),
+ },
+ {
+ type: "Map.values()",
+ iterable: new Map([["unused", "the-value"]]).values(),
+ },
+ {
+ type: "Iterator object",
+ iterable: { next: () => ({ value: "the-value", done: true }) },
+ },
+ {
+ type: "AsyncIterator object",
+ iterable: {
+ next: () => Promise.resolve({ value: "the-value", done: true }),
+ },
+ },
+ {
+ type: "Generator (yield)",
+ iterable: (function* () {
+ yield "the-value";
+ })(),
+ },
+ {
+ type: "Generator (return)",
+ iterable: (function* () {
+ return "the-value";
+ })(),
+ },
+ {
+ type: "Generator (yield*)",
+ iterable: (function* () {
+ // prettier-ignore
+ yield * ["the-value"];
+ })(),
+ },
+ {
+ type: "AsyncGenerator",
+ iterable: (async function* () {
+ await Promise.resolve();
+ yield "the-value";
+ })(),
+ },
+ {
+ type: "ReadableStream (push-mode)",
+ iterable: new ReadableStream({
+ start(controller) {
+ controller.enqueue("the-value");
+ controller.close();
+ },
+ }),
+ },
+ {
+ type: "ReadableStream (pull-mode)",
+ iterable: new ReadableStream({
+ pull(controller) {
+ controller.enqueue("the-value");
+ controller.close();
+ },
+ }),
+ },
+ ])("$type", async ({ iterable }) => {
+ app.use(eventHandler((event) => sendIterable(event, iterable)));
+ const response = await request.get("/");
+ expect(response.text).toBe("the-value");
+ });
+ });
+
+ describe("serializer argument", () => {
+ it("is called for every value", async () => {
+ const iterable = [1, "2", { field: 3 }, null];
+ const serializer = vi.fn(() => "x");
+
+ app.use(
+ eventHandler((event) =>
+ sendIterable(event, iterable, { serializer }),
+ ),
+ );
+ const response = await request.get("/");
+ expect(response.text).toBe("x".repeat(iterable.length));
+ expect(serializer).toBeCalledTimes(4);
+ for (const [i, obj] of iterable.entries()) {
+ expect.soft(serializer).toHaveBeenNthCalledWith(i + 1, obj);
+ }
+ });
+ });
+ });
+});
diff --git a/test/utils.test.ts b/test/utils.test.ts
index 5eb61402..e4511efa 100644
--- a/test/utils.test.ts
+++ b/test/utils.test.ts
@@ -1,5 +1,6 @@
+import { ReadableStream } from "node:stream/web";
import supertest, { SuperTest, Test } from "supertest";
-import { describe, it, expect, beforeEach } from "vitest";
+import { describe, it, expect, beforeEach, vi } from "vitest";
import {
createApp,
App,
@@ -13,7 +14,9 @@ import {
readFormData,
getRequestIP,
getRequestFingerprint,
+ sendIterable,
} from "../src";
+import { serializeIterableValue } from "../src/utils/internal/iteratable";
describe("", () => {
let app: App;
@@ -36,6 +39,138 @@ describe("", () => {
});
});
+ describe("serializeIterableValue", () => {
+ const exampleDate: Date = new Date(Date.UTC(2015, 6, 21, 3, 24, 54, 888));
+ it.each([
+ { value: "Hello, world!", output: "Hello, world!" },
+ { value: 123, output: "123" },
+ { value: 1n, output: "1" },
+ { value: true, output: "true" },
+ { value: false, output: "false" },
+ { value: undefined, output: undefined },
+ { value: null, output: "null" },
+ { value: exampleDate, output: JSON.stringify(exampleDate) },
+ { value: { field: 1 }, output: '{"field":1}' },
+ { value: [1, 2, 3], output: "[1,2,3]" },
+ { value: () => {}, output: undefined },
+ {
+ value: Buffer.from("Hello, world!"),
+ output: Buffer.from("Hello, world!"),
+ },
+ { value: Uint8Array.from([1, 2, 3]), output: Uint8Array.from([1, 2, 3]) },
+ ])("$value => $output", ({ value, output }) => {
+ const serialized = serializeIterableValue(value);
+ expect(serialized).toStrictEqual(output);
+ });
+ });
+
+ describe("sendIterable", () => {
+ it("sends empty body for an empty iterator", async () => {
+ app.use(eventHandler((event) => sendIterable(event, [])));
+ const result = await request.get("/");
+ expect(result.header["content-length"]).toBe("0");
+ expect(result.text).toBe("");
+ });
+
+ it("concatenates iterated values", async () => {
+ app.use(eventHandler((event) => sendIterable(event, ["a", "b", "c"])));
+ const result = await request.get("/");
+ expect(result.text).toBe("abc");
+ });
+
+ describe("iterable support", () => {
+ it.each([
+ { type: "Array", iterable: ["the-value"] },
+ { type: "Set", iterable: new Set(["the-value"]) },
+ {
+ type: "Map.keys()",
+ iterable: new Map([["the-value", "unused"]]).keys(),
+ },
+ {
+ type: "Map.values()",
+ iterable: new Map([["unused", "the-value"]]).values(),
+ },
+ {
+ type: "Iterator object",
+ iterable: { next: () => ({ value: "the-value", done: true }) },
+ },
+ {
+ type: "AsyncIterator object",
+ iterable: {
+ next: () => Promise.resolve({ value: "the-value", done: true }),
+ },
+ },
+ {
+ type: "Generator (yield)",
+ iterable: (function* () {
+ yield "the-value";
+ })(),
+ },
+ {
+ type: "Generator (return)",
+ iterable: (function* () {
+ return "the-value";
+ })(),
+ },
+ {
+ type: "Generator (yield*)",
+ iterable: (function* () {
+ // prettier-ignore
+ yield * ["the-value"];
+ })(),
+ },
+ {
+ type: "AsyncGenerator",
+ iterable: (async function* () {
+ await Promise.resolve();
+ yield "the-value";
+ })(),
+ },
+ {
+ type: "ReadableStream (push-mode)",
+ iterable: new ReadableStream({
+ start(controller) {
+ controller.enqueue("the-value");
+ controller.close();
+ },
+ }),
+ },
+ {
+ type: "ReadableStream (pull-mode)",
+ iterable: new ReadableStream({
+ pull(controller) {
+ controller.enqueue("the-value");
+ controller.close();
+ },
+ }),
+ },
+ ])("$type", async ({ iterable }) => {
+ app.use(eventHandler((event) => sendIterable(event, iterable)));
+ const response = await request.get("/");
+ expect(response.text).toBe("the-value");
+ });
+ });
+
+ describe("serializer argument", () => {
+ it("is called for every value", async () => {
+ const iterable = [1, "2", { field: 3 }, null];
+ const serializer = vi.fn(() => "x");
+
+ app.use(
+ eventHandler((event) =>
+ sendIterable(event, iterable, { serializer }),
+ ),
+ );
+ const response = await request.get("/");
+ expect(response.text).toBe("x".repeat(iterable.length));
+ expect(serializer).toBeCalledTimes(4);
+ for (const [i, obj] of iterable.entries()) {
+ expect.soft(serializer).toHaveBeenNthCalledWith(i + 1, obj);
+ }
+ });
+ });
+ });
+
describe("useBase", () => {
it("can prefix routes", async () => {
app.use(