Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve support for fetch and web-streams in Node.js #1256

Merged
merged 7 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/happy-monkeys-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@smithy/node-http-handler": minor
"@smithy/util-stream": minor
---

handle web streams in streamCollector and sdkStreamMixin
7 changes: 7 additions & 0 deletions packages/fetch-http-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@

[![NPM version](https://img.shields.io/npm/v/@smithy/fetch-http-handler/latest.svg)](https://www.npmjs.com/package/@smithy/fetch-http-handler)
[![NPM downloads](https://img.shields.io/npm/dm/@smithy/fetch-http-handler.svg)](https://www.npmjs.com/package/@smithy/fetch-http-handler)

This is the default `requestHandler` used for browser applications.
Since Node.js introduced experimental Web Streams API in v16.5.0 and made it stable in v21.0.0,
you can consider using `fetch-http-handler` in Node.js, although it's not recommended.

For the Node.js default `requestHandler` implementation, see instead
[`@smithy/node-http-handler`](https://www.npmjs.com/package/@smithy/node-http-handler).
5 changes: 5 additions & 0 deletions packages/node-http-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@

[![NPM version](https://img.shields.io/npm/v/@smithy/node-http-handler/latest.svg)](https://www.npmjs.com/package/@smithy/node-http-handler)
[![NPM downloads](https://img.shields.io/npm/dm/@smithy/node-http-handler.svg)](https://www.npmjs.com/package/@smithy/node-http-handler)

This package implements the default `requestHandler` for Node.js using `node:http`, `node:https`, and `node:http2`.

For an example on how `requestHandler`s are used by Smithy generated SDK clients, refer to
the [AWS SDK for JavaScript (v3) supplemental docs](https://github.com/aws/aws-sdk-js-v3/blob/main/supplemental-docs/CLIENTS.md#request-handler-requesthandler).
14 changes: 14 additions & 0 deletions packages/node-http-handler/src/stream-collector/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ describe("streamCollector", () => {
expect(collectedData).toEqual(expected);
});

it("accepts ReadableStream if the global web stream implementation exists in Node.js", async () => {
if (typeof ReadableStream === "function") {
const data = await streamCollector(
new ReadableStream({
start(controller) {
controller.enqueue(Buffer.from("abcd"));
controller.close();
},
})
);
expect(Buffer.from(data)).toEqual(Buffer.from("abcd"));
}
});

it("will propagate errors from the stream", async () => {
// stream should emit an error right away
const mockReadStream = new ReadFromBuffers({
Expand Down
36 changes: 34 additions & 2 deletions packages/node-http-handler/src/stream-collector/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { StreamCollector } from "@smithy/types";
import { Readable } from "stream";
import type { ReadableStream as IReadableStream } from "stream/web";

import { Collector } from "./collector";

export const streamCollector: StreamCollector = (stream: Readable): Promise<Uint8Array> =>
new Promise((resolve, reject) => {
export const streamCollector: StreamCollector = (stream: Readable | IReadableStream): Promise<Uint8Array> => {
if (isReadableStreamInstance(stream)) {
// Web stream API in Node.js
return collectReadableStream(stream);
}
return new Promise((resolve, reject) => {
const collector = new Collector();
stream.pipe(collector);
stream.on("error", (err) => {
Expand All @@ -18,3 +23,30 @@ export const streamCollector: StreamCollector = (stream: Readable): Promise<Uint
resolve(bytes);
});
});
};

/**
* Note: the global.ReadableStream object is marked experimental, and was added in v18.0.0 of Node.js.
* The importable version was added in v16.5.0. We only test for the global version so as not to
* enforce an import on a Node.js version that may not have it, and import
* only the type from stream/web.
*/
const isReadableStreamInstance = (stream: unknown): stream is IReadableStream =>
typeof ReadableStream === "function" && stream instanceof ReadableStream;

async function collectReadableStream(stream: IReadableStream): Promise<Uint8Array> {
let res = new Uint8Array(0);
const reader = stream.getReader();
let isDone = false;
while (!isDone) {
const { done, value } = await reader.read();
if (value) {
const prior = res;
res = new Uint8Array(prior.length + value.length);
res.set(prior);
res.set(value, prior.length);
}
isDone = done;
}
return res;
}
12 changes: 6 additions & 6 deletions packages/util-stream/src/sdk-stream-mixin.browser.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe(sdkStreamMixin.name, () => {
for (const method of transformMethods) {
try {
await sdkStream[method]();
fail(new Error("expect subsequent tranform to fail"));
fail(new Error("expect subsequent transform to fail"));
} catch (error) {
expect(error.message).toContain("The stream has already been transformed");
}
Expand Down Expand Up @@ -64,7 +64,7 @@ describe(sdkStreamMixin.name, () => {
sdkStreamMixin({});
fail("expect unexpected stream to fail");
} catch (e) {
expect(e.message).toContain("nexpected stream implementation");
expect(e.message).toContain("unexpected stream implementation");
global.Blob = originalBlobCtr;
}
});
Expand All @@ -77,7 +77,7 @@ describe(sdkStreamMixin.name, () => {
expect(byteArray).toEqual(mockStreamCollectorReturn);
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
await sdkStream.transformToByteArray();
await expectAllTransformsToFail(sdkStream);
Expand Down Expand Up @@ -137,7 +137,7 @@ describe(sdkStreamMixin.name, () => {
}
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
await sdkStream.transformToString();
await expectAllTransformsToFail(sdkStream);
Expand All @@ -152,7 +152,7 @@ describe(sdkStreamMixin.name, () => {
expect(transformed).toBe(payloadStream);
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const payloadStream = new ReadableStream();
const sdkStream = sdkStreamMixin(payloadStream as any);
sdkStream.transformToWebStream();
Expand Down Expand Up @@ -212,7 +212,7 @@ describe(sdkStreamMixin.name, () => {
}
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const payloadStream = new Blob();
const sdkStream = sdkStreamMixin(payloadStream as any);
sdkStream.transformToWebStream();
Expand Down
25 changes: 20 additions & 5 deletions packages/util-stream/src/sdk-stream-mixin.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe(sdkStreamMixin.name, () => {
for (const method of transformMethods) {
try {
await sdkStream[method]();
fail(new Error("expect subsequent tranform to fail"));
fail(new Error("expect subsequent transform to fail"));
} catch (error) {
expect(error.message).toContain("The stream has already been transformed");
}
Expand All @@ -39,6 +39,21 @@ describe(sdkStreamMixin.name, () => {
passThrough = new PassThrough();
});

it("should attempt to use the ReadableStream version if the input is not a Readable", async () => {
if (typeof ReadableStream !== "undefined") {
// ReadableStream is global only as of Node.js 18.
const sdkStream = sdkStreamMixin(
new ReadableStream({
start(controller) {
controller.enqueue(Buffer.from("abcd"));
controller.close();
},
})
);
expect(await sdkStream.transformToByteArray()).toEqual(new Uint8Array([97, 98, 99, 100]));
}
});

it("should throw if unexpected stream implementation is supplied", () => {
try {
const payload = {};
Expand All @@ -58,7 +73,7 @@ describe(sdkStreamMixin.name, () => {
expect(await sdkStream.transformToByteArray()).toEqual(expected);
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("abc")]);
expect(await sdkStream.transformToByteArray()).toEqual(byteArrayFromBuffer(Buffer.from("abc")));
Expand Down Expand Up @@ -108,7 +123,7 @@ describe(sdkStreamMixin.name, () => {
}
);

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToString();
Expand Down Expand Up @@ -164,14 +179,14 @@ describe(sdkStreamMixin.name, () => {
Readable.toWeb = originalToWebImpl;
});

it("should tranform Node stream to web stream", async () => {
it("should transform Node stream to web stream", async () => {
const sdkStream = sdkStreamMixin(passThrough);
sdkStream.transformToWebStream();
// @ts-expect-error
expect(Readable.toWeb).toBeCalled();
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToWebStream();
Expand Down
17 changes: 13 additions & 4 deletions packages/util-stream/src/sdk-stream-mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ import { fromArrayBuffer } from "@smithy/util-buffer-from";
import { Readable } from "stream";
import { TextDecoder } from "util";

import { sdkStreamMixin as sdkStreamMixinReadableStream } from "./sdk-stream-mixin.browser";

const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed.";

/**
* The function that mixes in the utility functions to help consuming runtime-specific payload stream.
*
* @internal
*/
export const sdkStreamMixin = (stream: unknown): SdkStream<Readable> => {
export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob> | SdkStream<Readable> => {
if (!(stream instanceof Readable)) {
// @ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`);
try {
/**
* If the stream is not node:stream::Readable, it may be a web stream within Node.js.
*/
return sdkStreamMixinReadableStream(stream);
} catch (e: unknown) {
// @ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`);
}
}

let transformed = false;
Expand Down
Loading