Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify WorkerReader/WorkerWriter and add readableStreamFromWorker/writableStreamFromWorker #24

Merged
merged 8 commits into from
Apr 29, 2023
99 changes: 0 additions & 99 deletions benchmark/benchmark.ts

This file was deleted.

2 changes: 1 addition & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"tasks": {
"test": "deno test -A --parallel",
"bench": "deno run -A benchmark/benchmark.ts",
"bench": "deno bench -A",
"check": "deno check $(find . -name '*.ts')",
"upgrade": "deno run -A https://deno.land/x/udd/main.ts $(find . -name '*.ts')"
}
Expand Down
29 changes: 28 additions & 1 deletion deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion mod.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./readable_stream.ts";
export * from "./reader.ts";
export * from "./writable_stream.ts";
export * from "./writer.ts";
export * from "./types.ts";
74 changes: 74 additions & 0 deletions readable_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const DEFAULT_CHUNK_SIZE = 16 * 1024;

/**
* Options for creating a readable stream from a worker.
*/
export interface ReadableStreamFromWorkerOptions {
/**
* The size of chunks to allocate to read. The default is ~16KiB, which is
* the maximum size that Deno operations can currently support.
*/
chunkSize?: number;

/**
* The queuing strategy to create the `ReadableStream` with.
*/
strategy?: {
/**
* A number representing the total number of bytes that can be stored in the
* stream's internal queue before backpressure is applied.
*
* If not specified, it defaults to 1.
*/
highWaterMark?: number | undefined;
/**
* This value should always be left undefined as the stream's underlying
* source is the worker.
*/
size?: undefined;
};
}

/**
* Creates a readable stream that reads data from a worker's `postMessage` event.
*
* @param worker The worker to read data from.
* @param options The options to configure the behavior of the stream. Defaults to
* 16 KiB chunk size and a queuing strategy with highWaterMark of 1 and undefined size.
* @returns A readable stream that can be used to read the data.
*/
export function readableStreamFromWorker(
worker: Worker,
options: ReadableStreamFromWorkerOptions = {},
): ReadableStream<Uint8Array> {
const {
chunkSize = DEFAULT_CHUNK_SIZE,
strategy,
} = options;
let onmessage: (e: MessageEvent<Uint8Array | null>) => void;
return new ReadableStream({
start(controller) {
onmessage = (ev) => {
if (ev.data === null) {
worker.removeEventListener("message", onmessage);
controller.close();
return;
} else if (ev.data instanceof Uint8Array) {
const data = ev.data;
let offset = 0;
while (offset < data.length) {
const end = offset + chunkSize;
controller.enqueue(data.subarray(offset, end));
offset = end;
}
} else {
throw new Error("Unexpected data posted");
}
};
worker.addEventListener("message", onmessage);
},
cancel() {
worker.removeEventListener("message", onmessage);
},
}, strategy);
}
59 changes: 59 additions & 0 deletions readable_stream_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { assertEquals } from "https://deno.land/[email protected]/testing/asserts.ts";
import { concat } from "https://deno.land/[email protected]/bytes/mod.ts";
import * as streams from "https://deno.land/[email protected]/streams/mod.ts";
import { readableStreamFromWorker, WorkerReader } from "./mod.ts";
import { MockWorker } from "./test_util.ts";

const count = 100;
const sizes = [
64,
128,
256,
512,
1024,
];

for (const size of sizes) {
const data = new Uint8Array(size);

Deno.bench(
`readableStreamFromWorker (${
size.toString().padStart(2)
} Bytes x ${count})`,
{
group: size.toString(),
baseline: true,
},
async () => {
const worker = new MockWorker();
const rstream = readableStreamFromWorker(worker);
for (let i = 0; i < count; i++) {
worker.postMessage(data);
}
worker.postMessage(null);
const chunks = [];
for await (const chunk of rstream) {
chunks.push(chunk);
}
const content = concat(...chunks);
assertEquals(content.length, size * count);
},
);

Deno.bench(
`WorkerReader (${size.toString().padStart(2)} Bytes x ${count})`,
{
group: size.toString(),
},
async () => {
const worker = new MockWorker();
const reader = new WorkerReader(worker);
for (let i = 0; i < count; i++) {
worker.postMessage(data);
}
worker.postMessage(null);
const content = await streams.readAll(reader);
assertEquals(content.length, size * count);
},
);
}
38 changes: 38 additions & 0 deletions readable_stream_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { assertEquals } from "https://deno.land/[email protected]/testing/asserts.ts";
import { concat } from "https://deno.land/[email protected]/bytes/mod.ts";
import { readableStreamFromWorker } from "./mod.ts";
import { MockWorker } from "./test_util.ts";

Deno.test(
"readableStreamFromWorker returns ReadableStream that yields data from worker",
async () => {
const worker = new MockWorker();
const rstream = readableStreamFromWorker(worker);
worker.postMessage(new Uint8Array([0, 1, 2, 3, 4]));
worker.postMessage(new Uint8Array([5, 6, 7, 8, 9]));
worker.postMessage(null);
const chunks = [];
for await (const chunk of rstream) {
chunks.push(chunk);
}
const content = concat(...chunks);
assertEquals(content, new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
},
);

Deno.test(
"readableStreamFromWorker returns ReadableStream that yields data from worker prior to close",
async () => {
const worker = new MockWorker();
const rstream = readableStreamFromWorker(worker);
worker.postMessage(new Uint8Array([0, 1, 2, 3, 4]));
worker.postMessage(null);
worker.postMessage(new Uint8Array([5, 6, 7, 8, 9]));
const chunks = [];
for await (const chunk of rstream) {
chunks.push(chunk);
}
const content = concat(...chunks);
assertEquals(content, new Uint8Array([0, 1, 2, 3, 4]));
},
);
Loading