Skip to content

Commit

Permalink
feat: improve support for fetch and web-streams in Node.js (#1256)
Browse files Browse the repository at this point in the history
* feat: improve fetch-http-handler compatibility in Node.js

* add changeset

* unit tests

* formatting

* Update packages/fetch-http-handler/README.md

Co-authored-by: Trivikram Kamat <[email protected]>

* Update packages/node-http-handler/src/stream-collector/index.ts

Co-authored-by: Trivikram Kamat <[email protected]>

* test: modify conditional unit tests

---------

Co-authored-by: Trivikram Kamat <[email protected]>
  • Loading branch information
kuhe and trivikr authored May 8, 2024
1 parent 671aa70 commit 3500f34
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 17 deletions.
6 changes: 6 additions & 0 deletions .changeset/happy-monkeys-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@smithy/node-http-handler": minor
"@smithy/util-stream": minor
---

handle web streams in streamCollector and sdkStreamMixin
7 changes: 7 additions & 0 deletions packages/fetch-http-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@

[![NPM version](https://img.shields.io/npm/v/@smithy/fetch-http-handler/latest.svg)](https://www.npmjs.com/package/@smithy/fetch-http-handler)
[![NPM downloads](https://img.shields.io/npm/dm/@smithy/fetch-http-handler.svg)](https://www.npmjs.com/package/@smithy/fetch-http-handler)

This is the default `requestHandler` used for browser applications.
Since Node.js introduced experimental Web Streams API in v16.5.0 and made it stable in v21.0.0,
you can consider using `fetch-http-handler` in Node.js, although it's not recommended.

For the Node.js default `requestHandler` implementation, see instead
[`@smithy/node-http-handler`](https://www.npmjs.com/package/@smithy/node-http-handler).
5 changes: 5 additions & 0 deletions packages/node-http-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@

[![NPM version](https://img.shields.io/npm/v/@smithy/node-http-handler/latest.svg)](https://www.npmjs.com/package/@smithy/node-http-handler)
[![NPM downloads](https://img.shields.io/npm/dm/@smithy/node-http-handler.svg)](https://www.npmjs.com/package/@smithy/node-http-handler)

This package implements the default `requestHandler` for Node.js using `node:http`, `node:https`, and `node:http2`.

For an example on how `requestHandler`s are used by Smithy generated SDK clients, refer to
the [AWS SDK for JavaScript (v3) supplemental docs](https://github.com/aws/aws-sdk-js-v3/blob/main/supplemental-docs/CLIENTS.md#request-handler-requesthandler).
14 changes: 14 additions & 0 deletions packages/node-http-handler/src/stream-collector/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ describe("streamCollector", () => {
expect(collectedData).toEqual(expected);
});

it("accepts ReadableStream if the global web stream implementation exists in Node.js", async () => {
if (typeof ReadableStream === "function") {
const data = await streamCollector(
new ReadableStream({
start(controller) {
controller.enqueue(Buffer.from("abcd"));
controller.close();
},
})
);
expect(Buffer.from(data)).toEqual(Buffer.from("abcd"));
}
});

it("will propagate errors from the stream", async () => {
// stream should emit an error right away
const mockReadStream = new ReadFromBuffers({
Expand Down
36 changes: 34 additions & 2 deletions packages/node-http-handler/src/stream-collector/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { StreamCollector } from "@smithy/types";
import { Readable } from "stream";
import type { ReadableStream as IReadableStream } from "stream/web";

import { Collector } from "./collector";

export const streamCollector: StreamCollector = (stream: Readable): Promise<Uint8Array> =>
new Promise((resolve, reject) => {
export const streamCollector: StreamCollector = (stream: Readable | IReadableStream): Promise<Uint8Array> => {
if (isReadableStreamInstance(stream)) {
// Web stream API in Node.js
return collectReadableStream(stream);
}
return new Promise((resolve, reject) => {
const collector = new Collector();
stream.pipe(collector);
stream.on("error", (err) => {
Expand All @@ -18,3 +23,30 @@ export const streamCollector: StreamCollector = (stream: Readable): Promise<Uint
resolve(bytes);
});
});
};

/**
* Note: the global.ReadableStream object is marked experimental, and was added in v18.0.0 of Node.js.
* The importable version was added in v16.5.0. We only test for the global version so as not to
* enforce an import on a Node.js version that may not have it, and import
* only the type from stream/web.
*/
const isReadableStreamInstance = (stream: unknown): stream is IReadableStream =>
typeof ReadableStream === "function" && stream instanceof ReadableStream;

async function collectReadableStream(stream: IReadableStream): Promise<Uint8Array> {
let res = new Uint8Array(0);
const reader = stream.getReader();
let isDone = false;
while (!isDone) {
const { done, value } = await reader.read();
if (value) {
const prior = res;
res = new Uint8Array(prior.length + value.length);
res.set(prior);
res.set(value, prior.length);
}
isDone = done;
}
return res;
}
12 changes: 6 additions & 6 deletions packages/util-stream/src/sdk-stream-mixin.browser.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe(sdkStreamMixin.name, () => {
for (const method of transformMethods) {
try {
await sdkStream[method]();
fail(new Error("expect subsequent tranform to fail"));
fail(new Error("expect subsequent transform to fail"));
} catch (error) {
expect(error.message).toContain("The stream has already been transformed");
}
Expand Down Expand Up @@ -64,7 +64,7 @@ describe(sdkStreamMixin.name, () => {
sdkStreamMixin({});
fail("expect unexpected stream to fail");
} catch (e) {
expect(e.message).toContain("nexpected stream implementation");
expect(e.message).toContain("unexpected stream implementation");
global.Blob = originalBlobCtr;
}
});
Expand All @@ -77,7 +77,7 @@ describe(sdkStreamMixin.name, () => {
expect(byteArray).toEqual(mockStreamCollectorReturn);
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
await sdkStream.transformToByteArray();
await expectAllTransformsToFail(sdkStream);
Expand Down Expand Up @@ -137,7 +137,7 @@ describe(sdkStreamMixin.name, () => {
}
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
await sdkStream.transformToString();
await expectAllTransformsToFail(sdkStream);
Expand All @@ -152,7 +152,7 @@ describe(sdkStreamMixin.name, () => {
expect(transformed).toBe(payloadStream);
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const payloadStream = new ReadableStream();
const sdkStream = sdkStreamMixin(payloadStream as any);
sdkStream.transformToWebStream();
Expand Down Expand Up @@ -212,7 +212,7 @@ describe(sdkStreamMixin.name, () => {
}
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const payloadStream = new Blob();
const sdkStream = sdkStreamMixin(payloadStream as any);
sdkStream.transformToWebStream();
Expand Down
25 changes: 20 additions & 5 deletions packages/util-stream/src/sdk-stream-mixin.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe(sdkStreamMixin.name, () => {
for (const method of transformMethods) {
try {
await sdkStream[method]();
fail(new Error("expect subsequent tranform to fail"));
fail(new Error("expect subsequent transform to fail"));
} catch (error) {
expect(error.message).toContain("The stream has already been transformed");
}
Expand All @@ -39,6 +39,21 @@ describe(sdkStreamMixin.name, () => {
passThrough = new PassThrough();
});

it("should attempt to use the ReadableStream version if the input is not a Readable", async () => {
if (typeof ReadableStream !== "undefined") {
// ReadableStream is global only as of Node.js 18.
const sdkStream = sdkStreamMixin(
new ReadableStream({
start(controller) {
controller.enqueue(Buffer.from("abcd"));
controller.close();
},
})
);
expect(await sdkStream.transformToByteArray()).toEqual(new Uint8Array([97, 98, 99, 100]));
}
});

it("should throw if unexpected stream implementation is supplied", () => {
try {
const payload = {};
Expand All @@ -58,7 +73,7 @@ describe(sdkStreamMixin.name, () => {
expect(await sdkStream.transformToByteArray()).toEqual(expected);
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("abc")]);
expect(await sdkStream.transformToByteArray()).toEqual(byteArrayFromBuffer(Buffer.from("abc")));
Expand Down Expand Up @@ -108,7 +123,7 @@ describe(sdkStreamMixin.name, () => {
}
);

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToString();
Expand Down Expand Up @@ -164,14 +179,14 @@ describe(sdkStreamMixin.name, () => {
Readable.toWeb = originalToWebImpl;
});

it("should tranform Node stream to web stream", async () => {
it("should transform Node stream to web stream", async () => {
const sdkStream = sdkStreamMixin(passThrough);
sdkStream.transformToWebStream();
// @ts-expect-error
expect(Readable.toWeb).toBeCalled();
});

it("should fail any subsequent tranform calls", async () => {
it("should fail any subsequent transform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToWebStream();
Expand Down
17 changes: 13 additions & 4 deletions packages/util-stream/src/sdk-stream-mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ import { fromArrayBuffer } from "@smithy/util-buffer-from";
import { Readable } from "stream";
import { TextDecoder } from "util";

import { sdkStreamMixin as sdkStreamMixinReadableStream } from "./sdk-stream-mixin.browser";

const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed.";

/**
* The function that mixes in the utility functions to help consuming runtime-specific payload stream.
*
* @internal
*/
export const sdkStreamMixin = (stream: unknown): SdkStream<Readable> => {
export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob> | SdkStream<Readable> => {
if (!(stream instanceof Readable)) {
// @ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`);
try {
/**
* If the stream is not node:stream::Readable, it may be a web stream within Node.js.
*/
return sdkStreamMixinReadableStream(stream);
} catch (e: unknown) {
// @ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`);
}
}

let transformed = false;
Expand Down

0 comments on commit 3500f34

Please sign in to comment.