Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvement #27

Merged
merged 7 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 68 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,79 @@
[![Test](https://github.com/lambdalisue/deno-workerio/actions/workflows/test.yml/badge.svg)](https://github.com/lambdalisue/deno-workerio/actions/workflows/test.yml)

[Deno][deno] module to translate Worker's system of messages into
[Reader][reader] and [Writer][writer].

This module supports Deno v1.28.0 or later.
[`ReadableStream<Uint8Array>`][readablestream] and
[`WritableStream<Uint8Array>`][writablestream] or [`Deno.Reader`][reader] and
[`Deno.Writer`][writer].

[deno]: https://deno.land/
[reader]: https://doc.deno.land/builtin/stable#Deno.Reader
[writer]: https://doc.deno.land/builtin/stable#Deno.Writer
[ReadableStream]: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
[WritableStream]: https://developer.mozilla.org/en-US/docs/Web/API/WritableStream

## Example

### Server
### ReadableStream/WritableStream

#### Server

```typescript
import {
readableStreamFromWorker,
writableStreamFromWorker,
} from "https://deno.land/x/workerio/mod.ts";

const decoder = new TextDecoder();
const encoder = new TextEncoder();

const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
});

const reader = readableStreamFromWorker(worker);
const writer = writableStreamFromWorker(worker);
const w = writer.getWriter();

await w.write(encoder.encode("Hello"));
await w.write(encoder.encode("World"));
w.releaseLock();

for await (const data of reader) {
const text = decoder.decode(data);
console.log(text);
}
```

#### Worker

```typescript
import {
readableStreamFromWorker,
writableStreamFromWorker,
} from "https://deno.land/x/workerio/mod.ts";

const decoder = new TextDecoder();
const encoder = new TextEncoder();

async function main(): Promise<void> {
const worker = self as unknown as Worker;
const reader = readableStreamFromWorker(worker);
const writer = writableStreamFromWorker(worker);
const w = writer.getWriter();

for await (const data of reader) {
const text = decoder.decode(data);
await w.write(encoder.encode(`!!! ${text} !!!`));
}
w.releaseLock();
}

main().catch((e) => console.error(e));
```

### Deno.Reader/Deno.Writer

#### Server

```typescript
import {
Expand All @@ -42,7 +104,7 @@ for await (const data of Deno.iter(reader)) {
}
```

### Worker
#### Worker

```typescript
import {
Expand All @@ -54,8 +116,7 @@ const decoder = new TextDecoder();
const encoder = new TextEncoder();

async function main(): Promise<void> {
// deno-lint-ignore no-explicit-any
const worker = self as any;
const worker = self as unknown as Worker;
const reader = new WorkerReader(worker);
const writer = new WorkerWriter(worker);

Expand All @@ -68,52 +129,6 @@ async function main(): Promise<void> {
main().catch((e) => console.error(e));
```

## Benchmark

You can run benchmark of `WorkerReader` and `WorkerWriter` with the following
command:

```
$ deno run --no-check --allow-read --allow-net ./benchmark/benchmark.ts
===========================================================
Transfer: 1 MiB
N: 5 times
===========================================================
Relaxing 1 sec ...
Start benchmark
1 Reader: 9 [ms] Writer: 0 [ms]
2 Reader: 7 [ms] Writer: 0 [ms]
3 Reader: 6 [ms] Writer: 1 [ms]
4 Reader: 5 [ms] Writer: 1 [ms]
5 Reader: 6 [ms] Writer: 0 [ms]
===========================================================
Reader: Avg. 6.6000 msec (1271.0 Mbps)
Writer: Avg. 0.40000 msec (20972 Mbps)
===========================================================
```

Use `-n` to change the number of tries and `-size` to the size of the buffer (in
MB) like:

```
$ deno run --no-check --allow-read --allow-net ./benchmark/benchmark.ts -n 3 --size 8
===========================================================
Transfer: 8 MiB
N: 3 times
===========================================================
Relaxing 1 sec ...
Start benchmark
1 Reader: 53 [ms] Writer: 7 [ms]
2 Reader: 43 [ms] Writer: 1 [ms]
3 Reader: 40 [ms] Writer: 1 [ms]
===========================================================
Reader: Avg. 45.333 msec (1480.3 Mbps)
Writer: Avg. 3.0000 msec (22370 Mbps)
===========================================================
```

See [Benchmark](./wiki/Benchmark) for various benchmarks.

## License

The code follows MIT license written in [LICENSE](./LICENSE). Contributors need
Expand Down
2 changes: 1 addition & 1 deletion example/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as streams from "https://deno.land/std@0.185.0/streams/mod.ts";
import * as streams from "https://deno.land/std@0.186.0/streams/mod.ts";
import { WorkerReader, WorkerWriter } from "../mod.ts";

const decoder = new TextDecoder();
Expand Down
2 changes: 1 addition & 1 deletion example/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as streams from "https://deno.land/std@0.185.0/streams/mod.ts";
import * as streams from "https://deno.land/std@0.186.0/streams/mod.ts";
import { WorkerReader, WorkerWriter } from "../mod.ts";

const decoder = new TextDecoder();
Expand Down
50 changes: 9 additions & 41 deletions readable_stream.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,42 @@
const DEFAULT_CHUNK_SIZE = 16 * 1024;

/**
* Options for creating a readable stream from a worker.
*/
export interface ReadableStreamFromWorkerOptions {
/**
* The size of chunks to allocate to read. The default is ~16KiB, which is
* the maximum size that Deno operations can currently support.
*/
chunkSize?: number;

export type ReadableStreamFromWorkerOptions = {
/**
* The queuing strategy to create the `ReadableStream` with.
*/
strategy?: {
/**
* A number representing the total number of bytes that can be stored in the
* stream's internal queue before backpressure is applied.
*
* If not specified, it defaults to 1.
*/
highWaterMark?: number | undefined;
/**
* This value should always be left undefined as the stream's underlying
* source is the worker.
*/
size?: undefined;
};
}
strategy?: QueuingStrategy<Uint8Array>;
};

/**
* Creates a readable stream that reads data from a worker's `postMessage` event.
*
* @param worker The worker to read data from.
* @param options The options to configure the behavior of the stream. Defaults to
* 16 KiB chunk size and a queuing strategy with highWaterMark of 1 and undefined size.
* @param options Options for creating the readable stream.
* @returns A readable stream that can be used to read the data.
*/
export function readableStreamFromWorker(
worker: Worker,
options: ReadableStreamFromWorkerOptions = {},
): ReadableStream<Uint8Array> {
const {
chunkSize = DEFAULT_CHUNK_SIZE,
strategy,
strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 }),
} = options;
let onmessage: (e: MessageEvent<Uint8Array | null>) => void;
return new ReadableStream({
start(controller) {
onmessage = (ev) => {
worker.onmessage = (ev) => {
if (ev.data === null) {
worker.removeEventListener("message", onmessage);
controller.close();
return;
worker.onmessage = undefined;
} else if (ev.data instanceof Uint8Array) {
const data = ev.data;
let offset = 0;
while (offset < data.length) {
const end = offset + chunkSize;
controller.enqueue(data.subarray(offset, end));
offset = end;
}
controller.enqueue(ev.data);
} else {
throw new Error("Unexpected data posted");
}
};
worker.addEventListener("message", onmessage);
},
cancel() {
worker.removeEventListener("message", onmessage);
worker.onmessage = undefined;
},
}, strategy);
}
25 changes: 14 additions & 11 deletions readable_stream_bench.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { assertEquals } from "https://deno.land/[email protected]/testing/asserts.ts";
import { concat } from "https://deno.land/[email protected]/bytes/mod.ts";
import * as streams from "https://deno.land/[email protected]/streams/mod.ts";
import { assertEquals } from "https://deno.land/[email protected]/testing/asserts.ts";
import { readableStreamFromWorker, WorkerReader } from "./mod.ts";
import { MockWorker } from "./test_util.ts";

Expand All @@ -14,8 +12,6 @@ const sizes = [
];

for (const size of sizes) {
const data = new Uint8Array(size);

Deno.bench(
`readableStreamFromWorker (${
size.toString().padStart(2)
Expand All @@ -28,15 +24,15 @@ for (const size of sizes) {
const worker = new MockWorker();
const rstream = readableStreamFromWorker(worker);
for (let i = 0; i < count; i++) {
const data = new Uint8Array(size);
worker.postMessage(data);
}
worker.postMessage(null);
const chunks = [];
let total = 0;
for await (const chunk of rstream) {
chunks.push(chunk);
total += chunk.length;
}
const content = concat(...chunks);
assertEquals(content.length, size * count);
assertEquals(total, size * count);
},
);

Expand All @@ -49,11 +45,18 @@ for (const size of sizes) {
const worker = new MockWorker();
const reader = new WorkerReader(worker);
for (let i = 0; i < count; i++) {
const data = new Uint8Array(size);
worker.postMessage(data);
}
worker.postMessage(null);
const content = await streams.readAll(reader);
assertEquals(content.length, size * count);
let total = 0;
while (true) {
const p = new Uint8Array(1024);
const n = await reader.read(p);
if (n === null) break;
total += n;
}
assertEquals(total, size * count);
},
);
}
4 changes: 2 additions & 2 deletions readable_stream_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { assertEquals } from "https://deno.land/std@0.185.0/testing/asserts.ts";
import { concat } from "https://deno.land/std@0.185.0/bytes/mod.ts";
import { assertEquals } from "https://deno.land/std@0.186.0/testing/asserts.ts";
import { concat } from "https://deno.land/std@0.186.0/bytes/mod.ts";
import { readableStreamFromWorker } from "./mod.ts";
import { MockWorker } from "./test_util.ts";

Expand Down
Loading