-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreadable_stream.ts
44 lines (42 loc) · 1.22 KB
/
readable_stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import type { WorkerLike } from "./types.ts";
/**
* Options for creating a readable stream from a worker.
*/
export type ReadableStreamFromWorkerOptions = {
/**
* The queuing strategy to create the `ReadableStream` with.
*/
strategy?: QueuingStrategy<Uint8Array>;
};
/**
* Creates a readable stream that reads data from a worker's `postMessage` event.
*
* @param worker The worker to read data from.
* @param options Options for creating the readable stream.
* @returns A readable stream that can be used to read the data.
*/
export function readableStreamFromWorker(
worker: WorkerLike,
options: ReadableStreamFromWorkerOptions = {},
): ReadableStream<Uint8Array> {
const {
strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 }),
} = options;
return new ReadableStream({
start(controller) {
worker.onmessage = (ev) => {
if (ev.data === null) {
controller.close();
worker.onmessage = () => undefined;
} else if (ev.data instanceof Uint8Array) {
controller.enqueue(ev.data);
} else {
throw new Error("Unexpected data posted");
}
};
},
cancel() {
worker.onmessage = () => undefined;
},
}, strategy);
}