forked from smithy-lang/smithy-typescript
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(util-stream): splitStream and headStream utilities (smithy-lang#…
…1336) * feat(util-stream): add stream splitting function * variable naming * DRY isReadableStream typeguard * update type guard * rename type guard file * lint
- Loading branch information
Showing
11 changed files
with
300 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@smithy/util-stream": minor | ||
--- | ||
|
||
add splitStream and headStream utilities |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/** | ||
* @internal | ||
* @param stream | ||
* @param bytes - read head bytes from the stream and discard the rest of it. | ||
* | ||
* Caution: the input stream must be destroyed separately, this function does not do so. | ||
*/ | ||
export async function headStream(stream: ReadableStream, bytes: number): Promise<Uint8Array> { | ||
let byteLengthCounter = 0; | ||
const chunks = []; | ||
const reader = stream.getReader(); | ||
let isDone = false; | ||
|
||
while (!isDone) { | ||
const { done, value } = await reader.read(); | ||
if (value) { | ||
chunks.push(value); | ||
byteLengthCounter += value?.byteLength ?? 0; | ||
} | ||
if (byteLengthCounter >= bytes) { | ||
break; | ||
} | ||
isDone = done; | ||
} | ||
reader.releaseLock(); | ||
|
||
const collected = new Uint8Array(Math.min(bytes, byteLengthCounter)); | ||
let offset = 0; | ||
for (const chunk of chunks) { | ||
if (chunk.byteLength > collected.byteLength - offset) { | ||
collected.set(chunk.subarray(0, collected.byteLength - offset), offset); | ||
break; | ||
} else { | ||
collected.set(chunk, offset); | ||
} | ||
offset += chunk.length; | ||
} | ||
return collected; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import { Readable } from "stream"; | ||
|
||
import { headStream } from "./headStream"; | ||
import { headStream as headWebStream } from "./headStream.browser"; | ||
import { splitStream } from "./splitStream"; | ||
import { splitStream as splitWebStream } from "./splitStream.browser"; | ||
|
||
const CHUNK_SIZE = 4; | ||
const a32 = "abcd".repeat(32_000 / CHUNK_SIZE); | ||
const a16 = "abcd".repeat(16_000 / CHUNK_SIZE); | ||
const a8 = "abcd".repeat(8); | ||
const a4 = "abcd".repeat(4); | ||
const a2 = "abcd".repeat(2); | ||
const a1 = "abcd".repeat(1); | ||
|
||
describe(headStream.name, () => { | ||
it("should collect the head of a Node.js stream", async () => { | ||
const data = Buffer.from(a32); | ||
const myStream = Readable.from(data); | ||
|
||
const head = await headStream(myStream, 16_000); | ||
|
||
expect(Buffer.from(head).toString()).toEqual(a16); | ||
}); | ||
|
||
it("should collect the head of a web stream", async () => { | ||
if (typeof ReadableStream !== "undefined") { | ||
const buffer = Buffer.from(a32); | ||
const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength)); | ||
|
||
const myStream = new ReadableStream({ | ||
start(controller) { | ||
for (const inputChunk of data) { | ||
controller.enqueue(new Uint8Array([inputChunk])); | ||
} | ||
controller.close(); | ||
}, | ||
}); | ||
|
||
const head = await headWebStream(myStream, 16_000); | ||
expect(Buffer.from(head).toString()).toEqual(a16); | ||
} | ||
}); | ||
}); | ||
|
||
describe("splitStream and headStream integration", () => { | ||
it("should split and head streams for Node.js", async () => { | ||
const data = Buffer.from(a32); | ||
const myStream = Readable.from(data); | ||
|
||
const [a, _1] = await splitStream(myStream); | ||
const [b, _2] = await splitStream(_1); | ||
const [c, _3] = await splitStream(_2); | ||
const [d, _4] = await splitStream(_3); | ||
const [e, f] = await splitStream(_4); | ||
|
||
const byteArr1 = await headStream(a, Infinity); | ||
const byteArr2 = await headStream(b, 16_000); | ||
const byteArr3 = await headStream(c, 8 * CHUNK_SIZE); | ||
const byteArr4 = await headStream(d, 4 * CHUNK_SIZE); | ||
const byteArr5 = await headStream(e, 2 * CHUNK_SIZE); | ||
const byteArr6 = await headStream(f, CHUNK_SIZE); | ||
|
||
await Promise.all([a, b, c, d, e, f].map((stream) => stream.destroy())); | ||
|
||
expect(Buffer.from(byteArr1).toString()).toEqual(a32); | ||
expect(Buffer.from(byteArr2).toString()).toEqual(a16); | ||
expect(Buffer.from(byteArr3).toString()).toEqual(a8); | ||
expect(Buffer.from(byteArr4).toString()).toEqual(a4); | ||
expect(Buffer.from(byteArr5).toString()).toEqual(a2); | ||
expect(Buffer.from(byteArr6).toString()).toEqual(a1); | ||
}); | ||
|
||
it("should split and head streams for web streams API", async () => { | ||
if (typeof ReadableStream !== "undefined") { | ||
const buffer = Buffer.from(a8); | ||
const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength)); | ||
|
||
const myStream = new ReadableStream({ | ||
start(controller) { | ||
for (let i = 0; i < data.length; i += CHUNK_SIZE) { | ||
controller.enqueue(new Uint8Array(data.slice(i, i + CHUNK_SIZE))); | ||
} | ||
controller.close(); | ||
}, | ||
}); | ||
|
||
const [a, _1] = await splitWebStream(myStream); | ||
const [b, _2] = await splitWebStream(_1); | ||
const [c, _3] = await splitWebStream(_2); | ||
const [d, e] = await splitWebStream(_3); | ||
|
||
const byteArr1 = await headWebStream(a, Infinity); | ||
const byteArr2 = await headWebStream(b, 8 * CHUNK_SIZE); | ||
const byteArr3 = await headWebStream(c, 4 * CHUNK_SIZE); | ||
const byteArr4 = await headWebStream(d, 2 * CHUNK_SIZE); | ||
const byteArr5 = await headWebStream(e, CHUNK_SIZE); | ||
|
||
await Promise.all([a, b, c, d, e].map((stream) => stream.cancel())); | ||
|
||
expect(Buffer.from(byteArr1).toString()).toEqual(a8); | ||
expect(Buffer.from(byteArr2).toString()).toEqual(a8); | ||
expect(Buffer.from(byteArr3).toString()).toEqual(a4); | ||
expect(Buffer.from(byteArr4).toString()).toEqual(a2); | ||
expect(Buffer.from(byteArr5).toString()).toEqual(a1); | ||
} | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import { Readable, Writable } from "stream"; | ||
|
||
import { headStream as headWebStream } from "./headStream.browser"; | ||
import { isReadableStream } from "./stream-type-check"; | ||
|
||
/** | ||
* @internal | ||
* @param stream | ||
* @param bytes - read head bytes from the stream and discard the rest of it. | ||
* | ||
* Caution: the input stream must be destroyed separately, this function does not do so. | ||
*/ | ||
export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise<Uint8Array> => { | ||
if (isReadableStream(stream)) { | ||
return headWebStream(stream, bytes); | ||
} | ||
return new Promise((resolve, reject) => { | ||
const collector = new Collector(); | ||
collector.limit = bytes; | ||
stream.pipe(collector); | ||
stream.on("error", (err) => { | ||
collector.end(); | ||
reject(err); | ||
}); | ||
collector.on("error", reject); | ||
collector.on("finish", function (this: Collector) { | ||
const bytes = new Uint8Array(Buffer.concat(this.buffers)); | ||
resolve(bytes); | ||
}); | ||
}); | ||
}; | ||
|
||
class Collector extends Writable { | ||
public readonly buffers: Buffer[] = []; | ||
public limit = Infinity; | ||
private bytesBuffered = 0; | ||
|
||
_write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) { | ||
this.buffers.push(chunk); | ||
this.bytesBuffered += chunk.byteLength ?? 0; | ||
if (this.bytesBuffered >= this.limit) { | ||
const excess = this.bytesBuffered - this.limit; | ||
const tailBuffer = this.buffers[this.buffers.length - 1]; | ||
this.buffers[this.buffers.length - 1] = tailBuffer.subarray(0, tailBuffer.byteLength - excess); | ||
this.emit("finish"); | ||
} | ||
callback(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
export * from "./blob/Uint8ArrayBlobAdapter"; | ||
export * from "./getAwsChunkedEncodingStream"; | ||
export * from "./sdk-stream-mixin"; | ||
export * from "./splitStream"; | ||
export * from "./headStream"; | ||
export * from "./stream-type-check"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
/** | ||
* @param stream | ||
* @returns stream split into two identical streams. | ||
*/ | ||
export async function splitStream(stream: ReadableStream | Blob): Promise<[ReadableStream, ReadableStream]> { | ||
if (typeof (stream as Blob).stream === "function") { | ||
stream = (stream as Blob).stream(); | ||
} | ||
const readableStream = stream as ReadableStream; | ||
return readableStream.tee(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import { streamCollector as webStreamCollector } from "@smithy/fetch-http-handler"; | ||
import { streamCollector } from "@smithy/node-http-handler"; | ||
import { Readable } from "stream"; | ||
|
||
import { splitStream } from "./splitStream"; | ||
import { splitStream as splitWebStream } from "./splitStream.browser"; | ||
|
||
describe(splitStream.name, () => { | ||
it("should split a node:Readable stream", async () => { | ||
const data = Buffer.from("abcd"); | ||
|
||
const myStream = Readable.from(data); | ||
const [a, b] = await splitStream(myStream); | ||
|
||
const buffer1 = await streamCollector(a); | ||
const buffer2 = await streamCollector(b); | ||
|
||
expect(buffer1).toEqual(new Uint8Array([97, 98, 99, 100])); | ||
expect(buffer1).toEqual(buffer2); | ||
}); | ||
it("should split a web:ReadableStream stream", async () => { | ||
if (typeof ReadableStream !== "undefined") { | ||
const inputChunks = [97, 98, 99, 100]; | ||
|
||
const myStream = new ReadableStream({ | ||
start(controller) { | ||
for (const inputChunk of inputChunks) { | ||
controller.enqueue(new Uint8Array([inputChunk])); | ||
} | ||
controller.close(); | ||
}, | ||
}); | ||
|
||
const [a, b] = await splitWebStream(myStream); | ||
|
||
const bytes1 = await webStreamCollector(a); | ||
const bytes2 = await webStreamCollector(b); | ||
|
||
expect(bytes1).toEqual(new Uint8Array([97, 98, 99, 100])); | ||
expect(bytes1).toEqual(bytes2); | ||
} | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import type { Readable } from "stream"; | ||
import { PassThrough } from "stream"; | ||
|
||
import { splitStream as splitWebStream } from "./splitStream.browser"; | ||
import { isReadableStream } from "./stream-type-check"; | ||
|
||
/** | ||
* @param stream | ||
* @returns stream split into two identical streams. | ||
*/ | ||
export async function splitStream(stream: Readable): Promise<[Readable, Readable]>; | ||
export async function splitStream(stream: ReadableStream): Promise<[ReadableStream, ReadableStream]>; | ||
export async function splitStream( | ||
stream: Readable | ReadableStream | ||
): Promise<[Readable | ReadableStream, Readable | ReadableStream]> { | ||
if (isReadableStream(stream)) { | ||
return splitWebStream(stream); | ||
} | ||
const stream1 = new PassThrough(); | ||
const stream2 = new PassThrough(); | ||
stream.pipe(stream1); | ||
stream.pipe(stream2); | ||
return [stream1, stream2]; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
/** | ||
* @internal | ||
*/ | ||
export const isReadableStream = (stream: unknown): stream is ReadableStream => | ||
typeof ReadableStream === "function" && | ||
(stream?.constructor?.name === ReadableStream.name || stream instanceof ReadableStream); |