From f24e69ea22c2929171ddcd840f2ff6e9fcb20224 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Fri, 23 Nov 2018 11:03:05 +0000 Subject: [PATCH] refactor(rstream): move tunnel to /subs, add docs --- packages/rstream/src/index.ts | 2 +- packages/rstream/src/subs/tunnel.ts | 100 ++++++++++++++++++++++++++++ packages/rstream/src/tunnel.ts | 66 ------------------ 3 files changed, 101 insertions(+), 67 deletions(-) create mode 100644 packages/rstream/src/subs/tunnel.ts delete mode 100644 packages/rstream/src/tunnel.ts diff --git a/packages/rstream/src/index.ts b/packages/rstream/src/index.ts index b6d5e803e0..007933fc25 100644 --- a/packages/rstream/src/index.ts +++ b/packages/rstream/src/index.ts @@ -5,7 +5,6 @@ export * from "./stream-merge"; export * from "./stream-sync"; export * from "./subscription"; export * from "./trigger"; -export * from "./tunnel"; export * from "./from/atom"; export * from "./from/event"; @@ -24,5 +23,6 @@ export * from "./subs/sidechain-partition"; export * from "./subs/sidechain-toggle"; export * from "./subs/trace"; export * from "./subs/transduce"; +export * from "./subs/tunnel"; export * from "./utils/worker"; diff --git a/packages/rstream/src/subs/tunnel.ts b/packages/rstream/src/subs/tunnel.ts new file mode 100644 index 0000000000..b228a07026 --- /dev/null +++ b/packages/rstream/src/subs/tunnel.ts @@ -0,0 +1,100 @@ +import { DEBUG, State } from "../api"; +import { Subscription } from "../subscription"; +import { makeWorker } from "../utils/worker"; + +export interface TunnelOpts { + /** + * Tunnelled worker instance, source blob or script URL. + * If `interrupt` is enabled, the worker MUST be given as blob or URL. + */ + src: Worker | Blob | string; + /** + * Optional subscription ID to use. + */ + id?: string; + /** + * Optional function to extract transferables from incoming stream + * values, e.g. ArrayBuffers. See: + * https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage + */ + transferables?: (x: A) => any[]; + /** + * If given and greater than zero, the worker will be terminated + * after given period (in millis) after the parent stream is done. + * + * Default: 0 + */ + terminate?: number; + /** + * If true, the worker will be terminated and restarted for each new + * stream value. This is useful to avoid executing extraneous work + * and ensures only the most rececent stream value is being processed. + * + * Default: false + */ + interrupt?: boolean; +} + +/** + * Creates a new worker `Tunnel` instance with given options. This + * subscription type processes received values via the configured worker + * and then passes any values received back from the worker on to + * downstream subscriptions, thereby allowing workers to be used + * transparently for stream processing. + * + * @param opts + */ +export const tunnel = (opts: TunnelOpts) => + new Tunnel(opts); + +export class Tunnel extends Subscription { + + worker: Worker; + src: Worker | Blob | string; + transferables: (x: A) => any[]; + terminate: number; + interrupt: boolean; + + constructor(opts: TunnelOpts) { + super(null, null, null, opts.id || `tunnel-${Subscription.NEXT_ID++}`); + this.src = opts.src; + this.transferables = opts.transferables; + this.terminate = opts.terminate || 0; + this.interrupt = opts.interrupt; + } + + next(x: A) { + if (this.state < State.DONE) { + let tx; + if (this.transferables) { + tx = this.transferables(x); + } + if (this.interrupt && this.worker) { + this.worker.terminate(); + this.worker = null; + } + if (!this.worker) { + this.worker = makeWorker(this.src); + this.worker.addEventListener( + "message", + (e: MessageEvent) => this.dispatch(e.data) + ); + this.worker.addEventListener( + "error", + (e: ErrorEvent) => this.error(e) + ); + } + this.worker.postMessage(x, tx); + } + } + + done() { + super.done(); + if (this.terminate > 0) { + setTimeout(() => { + DEBUG && console.log("terminating worker..."); + this.worker.terminate(); + }, this.terminate); + } + } +} diff --git a/packages/rstream/src/tunnel.ts b/packages/rstream/src/tunnel.ts deleted file mode 100644 index 6dfc8aa05b..0000000000 --- a/packages/rstream/src/tunnel.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { DEBUG, State } from "./api"; -import { Subscription } from "./subscription"; -import { makeWorker } from "./utils/worker"; - -export interface TunnelOpts { - src: Worker | Blob | string; - id?: string; - transferrables?: (x: A) => any[]; - terminate?: number; - interrupt?: boolean; -} - -export const tunnel = (opts: TunnelOpts) => - new Tunnel(opts); - -export class Tunnel extends Subscription { - - opts: TunnelOpts; - worker: Worker; - - constructor(opts: TunnelOpts) { - super(null, null, null, opts.id || `tunnel-${Subscription.NEXT_ID++}`); - this.opts = { - terminate: 0, - interrupt: false, - ...opts - }; - } - - next(x: A) { - if (this.state < State.DONE) { - const opts = this.opts; - let tx; - if (opts.transferrables) { - tx = opts.transferrables(x); - } - if (opts.interrupt && this.worker) { - this.worker.terminate(); - this.worker = null; - } - if (!this.worker) { - this.worker = makeWorker(opts.src); - this.worker.addEventListener( - "message", - (e: MessageEvent) => this.dispatch(e.data) - ); - this.worker.addEventListener( - "error", - (e: ErrorEvent) => this.error(e) - ); - } - this.worker.postMessage(x, tx); - } - } - - done() { - super.done(); - const terminate = this.opts.terminate; - if (terminate > 0) { - setTimeout(() => { - DEBUG && console.log("terminating worker..."); - this.worker.terminate(); - }, terminate); - } - } -} \ No newline at end of file