diff --git a/.changeset/happy-monkeys-tap.md b/.changeset/happy-monkeys-tap.md new file mode 100644 index 00000000000..4d9c35a8d09 --- /dev/null +++ b/.changeset/happy-monkeys-tap.md @@ -0,0 +1,6 @@ +--- +"@smithy/node-http-handler": minor +"@smithy/util-stream": minor +--- + +handle web streams in streamCollector and sdkStreamMixin diff --git a/packages/fetch-http-handler/README.md b/packages/fetch-http-handler/README.md index 05df3aaa3b3..e52e8f13de1 100644 --- a/packages/fetch-http-handler/README.md +++ b/packages/fetch-http-handler/README.md @@ -2,3 +2,10 @@ [![NPM version](https://img.shields.io/npm/v/@smithy/fetch-http-handler/latest.svg)](https://www.npmjs.com/package/@smithy/fetch-http-handler) [![NPM downloads](https://img.shields.io/npm/dm/@smithy/fetch-http-handler.svg)](https://www.npmjs.com/package/@smithy/fetch-http-handler) + +This is the default `requestHandler` used for browser applications. +Since Node.js introduced experimental Web Streams API in v16.5.0 and made it stable in v21.0.0, +you can consider using `fetch-http-handler` in Node.js, although it's not recommended. + +For the Node.js default `requestHandler` implementation, see instead +[`@smithy/node-http-handler`](https://www.npmjs.com/package/@smithy/node-http-handler). diff --git a/packages/node-http-handler/README.md b/packages/node-http-handler/README.md index 4063e6706a3..214719f3f52 100644 --- a/packages/node-http-handler/README.md +++ b/packages/node-http-handler/README.md @@ -2,3 +2,8 @@ [![NPM version](https://img.shields.io/npm/v/@smithy/node-http-handler/latest.svg)](https://www.npmjs.com/package/@smithy/node-http-handler) [![NPM downloads](https://img.shields.io/npm/dm/@smithy/node-http-handler.svg)](https://www.npmjs.com/package/@smithy/node-http-handler) + +This package implements the default `requestHandler` for Node.js using `node:http`, `node:https`, and `node:http2`. + +For an example on how `requestHandler`s are used by Smithy generated SDK clients, refer to +the [AWS SDK for JavaScript (v3) supplemental docs](https://github.com/aws/aws-sdk-js-v3/blob/main/supplemental-docs/CLIENTS.md#request-handler-requesthandler). diff --git a/packages/node-http-handler/src/stream-collector/index.spec.ts b/packages/node-http-handler/src/stream-collector/index.spec.ts index f9e7c32a7c4..fb958f0f6ea 100644 --- a/packages/node-http-handler/src/stream-collector/index.spec.ts +++ b/packages/node-http-handler/src/stream-collector/index.spec.ts @@ -12,6 +12,20 @@ describe("streamCollector", () => { expect(collectedData).toEqual(expected); }); + it("accepts ReadableStream if the global web stream implementation exists in Node.js", async () => { + if (typeof ReadableStream === "function") { + const data = await streamCollector( + new ReadableStream({ + start(controller) { + controller.enqueue(Buffer.from("abcd")); + controller.close(); + }, + }) + ); + expect(Buffer.from(data)).toEqual(Buffer.from("abcd")); + } + }); + it("will propagate errors from the stream", async () => { // stream should emit an error right away const mockReadStream = new ReadFromBuffers({ diff --git a/packages/node-http-handler/src/stream-collector/index.ts b/packages/node-http-handler/src/stream-collector/index.ts index f48e64a4ea7..26d3e9dad92 100644 --- a/packages/node-http-handler/src/stream-collector/index.ts +++ b/packages/node-http-handler/src/stream-collector/index.ts @@ -1,10 +1,15 @@ import { StreamCollector } from "@smithy/types"; import { Readable } from "stream"; +import type { ReadableStream as IReadableStream } from "stream/web"; import { Collector } from "./collector"; -export const streamCollector: StreamCollector = (stream: Readable): Promise => - new Promise((resolve, reject) => { +export const streamCollector: StreamCollector = (stream: Readable | IReadableStream): Promise => { + if (isReadableStreamInstance(stream)) { + // Web stream API in Node.js + return collectReadableStream(stream); + } + return new Promise((resolve, reject) => { const collector = new Collector(); stream.pipe(collector); stream.on("error", (err) => { @@ -18,3 +23,30 @@ export const streamCollector: StreamCollector = (stream: Readable): Promise + typeof ReadableStream === "function" && stream instanceof ReadableStream; + +async function collectReadableStream(stream: IReadableStream): Promise { + let res = new Uint8Array(0); + const reader = stream.getReader(); + let isDone = false; + while (!isDone) { + const { done, value } = await reader.read(); + if (value) { + const prior = res; + res = new Uint8Array(prior.length + value.length); + res.set(prior); + res.set(value, prior.length); + } + isDone = done; + } + return res; +} diff --git a/packages/util-stream/src/sdk-stream-mixin.browser.spec.ts b/packages/util-stream/src/sdk-stream-mixin.browser.spec.ts index 48c5527c58f..f3d7b896f01 100644 --- a/packages/util-stream/src/sdk-stream-mixin.browser.spec.ts +++ b/packages/util-stream/src/sdk-stream-mixin.browser.spec.ts @@ -25,7 +25,7 @@ describe(sdkStreamMixin.name, () => { for (const method of transformMethods) { try { await sdkStream[method](); - fail(new Error("expect subsequent tranform to fail")); + fail(new Error("expect subsequent transform to fail")); } catch (error) { expect(error.message).toContain("The stream has already been transformed"); } @@ -64,7 +64,7 @@ describe(sdkStreamMixin.name, () => { sdkStreamMixin({}); fail("expect unexpected stream to fail"); } catch (e) { - expect(e.message).toContain("nexpected stream implementation"); + expect(e.message).toContain("unexpected stream implementation"); global.Blob = originalBlobCtr; } }); @@ -77,7 +77,7 @@ describe(sdkStreamMixin.name, () => { expect(byteArray).toEqual(mockStreamCollectorReturn); }); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const sdkStream = sdkStreamMixin(payloadStream); await sdkStream.transformToByteArray(); await expectAllTransformsToFail(sdkStream); @@ -137,7 +137,7 @@ describe(sdkStreamMixin.name, () => { } }); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const sdkStream = sdkStreamMixin(payloadStream); await sdkStream.transformToString(); await expectAllTransformsToFail(sdkStream); @@ -152,7 +152,7 @@ describe(sdkStreamMixin.name, () => { expect(transformed).toBe(payloadStream); }); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const payloadStream = new ReadableStream(); const sdkStream = sdkStreamMixin(payloadStream as any); sdkStream.transformToWebStream(); @@ -212,7 +212,7 @@ describe(sdkStreamMixin.name, () => { } }); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const payloadStream = new Blob(); const sdkStream = sdkStreamMixin(payloadStream as any); sdkStream.transformToWebStream(); diff --git a/packages/util-stream/src/sdk-stream-mixin.spec.ts b/packages/util-stream/src/sdk-stream-mixin.spec.ts index 9f8db07a5aa..beff40a87cb 100644 --- a/packages/util-stream/src/sdk-stream-mixin.spec.ts +++ b/packages/util-stream/src/sdk-stream-mixin.spec.ts @@ -28,7 +28,7 @@ describe(sdkStreamMixin.name, () => { for (const method of transformMethods) { try { await sdkStream[method](); - fail(new Error("expect subsequent tranform to fail")); + fail(new Error("expect subsequent transform to fail")); } catch (error) { expect(error.message).toContain("The stream has already been transformed"); } @@ -39,6 +39,21 @@ describe(sdkStreamMixin.name, () => { passThrough = new PassThrough(); }); + it("should attempt to use the ReadableStream version if the input is not a Readable", async () => { + if (typeof ReadableStream !== "undefined") { + // ReadableStream is global only as of Node.js 18. + const sdkStream = sdkStreamMixin( + new ReadableStream({ + start(controller) { + controller.enqueue(Buffer.from("abcd")); + controller.close(); + }, + }) + ); + expect(await sdkStream.transformToByteArray()).toEqual(new Uint8Array([97, 98, 99, 100])); + } + }); + it("should throw if unexpected stream implementation is supplied", () => { try { const payload = {}; @@ -58,7 +73,7 @@ describe(sdkStreamMixin.name, () => { expect(await sdkStream.transformToByteArray()).toEqual(expected); }); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const sdkStream = sdkStreamMixin(passThrough); await writeDataToStream(passThrough, [Buffer.from("abc")]); expect(await sdkStream.transformToByteArray()).toEqual(byteArrayFromBuffer(Buffer.from("abc"))); @@ -108,7 +123,7 @@ describe(sdkStreamMixin.name, () => { } ); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const sdkStream = sdkStreamMixin(passThrough); await writeDataToStream(passThrough, [Buffer.from("foo")]); await sdkStream.transformToString(); @@ -164,14 +179,14 @@ describe(sdkStreamMixin.name, () => { Readable.toWeb = originalToWebImpl; }); - it("should tranform Node stream to web stream", async () => { + it("should transform Node stream to web stream", async () => { const sdkStream = sdkStreamMixin(passThrough); sdkStream.transformToWebStream(); // @ts-expect-error expect(Readable.toWeb).toBeCalled(); }); - it("should fail any subsequent tranform calls", async () => { + it("should fail any subsequent transform calls", async () => { const sdkStream = sdkStreamMixin(passThrough); await writeDataToStream(passThrough, [Buffer.from("foo")]); await sdkStream.transformToWebStream(); diff --git a/packages/util-stream/src/sdk-stream-mixin.ts b/packages/util-stream/src/sdk-stream-mixin.ts index e8ef11be2c4..99adb59c605 100644 --- a/packages/util-stream/src/sdk-stream-mixin.ts +++ b/packages/util-stream/src/sdk-stream-mixin.ts @@ -4,6 +4,8 @@ import { fromArrayBuffer } from "@smithy/util-buffer-from"; import { Readable } from "stream"; import { TextDecoder } from "util"; +import { sdkStreamMixin as sdkStreamMixinReadableStream } from "./sdk-stream-mixin.browser"; + const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed."; /** @@ -11,11 +13,18 @@ const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transfo * * @internal */ -export const sdkStreamMixin = (stream: unknown): SdkStream => { +export const sdkStreamMixin = (stream: unknown): SdkStream | SdkStream => { if (!(stream instanceof Readable)) { - // @ts-ignore - const name = stream?.__proto__?.constructor?.name || stream; - throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`); + try { + /** + * If the stream is not node:stream::Readable, it may be a web stream within Node.js. + */ + return sdkStreamMixinReadableStream(stream); + } catch (e: unknown) { + // @ts-ignore + const name = stream?.__proto__?.constructor?.name || stream; + throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`); + } } let transformed = false;