Skip to content

Commit

Permalink
refactor(rstream): move tunnel to /subs, add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Nov 23, 2018
1 parent a39b98c commit f24e69e
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 67 deletions.
2 changes: 1 addition & 1 deletion packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ export * from "./stream-merge";
export * from "./stream-sync";
export * from "./subscription";
export * from "./trigger";
export * from "./tunnel";

export * from "./from/atom";
export * from "./from/event";
Expand All @@ -24,5 +23,6 @@ export * from "./subs/sidechain-partition";
export * from "./subs/sidechain-toggle";
export * from "./subs/trace";
export * from "./subs/transduce";
export * from "./subs/tunnel";

export * from "./utils/worker";
100 changes: 100 additions & 0 deletions packages/rstream/src/subs/tunnel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { DEBUG, State } from "../api";
import { Subscription } from "../subscription";
import { makeWorker } from "../utils/worker";

export interface TunnelOpts<A> {
/**
* Tunnelled worker instance, source blob or script URL.
* If `interrupt` is enabled, the worker MUST be given as blob or URL.
*/
src: Worker | Blob | string;
/**
* Optional subscription ID to use.
*/
id?: string;
/**
* Optional function to extract transferables from incoming stream
* values, e.g. ArrayBuffers. See:
* https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage
*/
transferables?: (x: A) => any[];
/**
* If given and greater than zero, the worker will be terminated
* after given period (in millis) after the parent stream is done.
*
* Default: 0
*/
terminate?: number;
/**
* If true, the worker will be terminated and restarted for each new
* stream value. This is useful to avoid executing extraneous work
* and ensures only the most rececent stream value is being processed.
*
* Default: false
*/
interrupt?: boolean;
}

/**
* Creates a new worker `Tunnel` instance with given options. This
* subscription type processes received values via the configured worker
* and then passes any values received back from the worker on to
* downstream subscriptions, thereby allowing workers to be used
* transparently for stream processing.
*
* @param opts
*/
export const tunnel = <A, B>(opts: TunnelOpts<A>) =>
new Tunnel<A, B>(opts);

export class Tunnel<A, B> extends Subscription<A, B> {

worker: Worker;
src: Worker | Blob | string;
transferables: (x: A) => any[];
terminate: number;
interrupt: boolean;

constructor(opts: TunnelOpts<A>) {
super(null, null, null, opts.id || `tunnel-${Subscription.NEXT_ID++}`);
this.src = opts.src;
this.transferables = opts.transferables;
this.terminate = opts.terminate || 0;
this.interrupt = opts.interrupt;
}

next(x: A) {
if (this.state < State.DONE) {
let tx;
if (this.transferables) {
tx = this.transferables(x);
}
if (this.interrupt && this.worker) {
this.worker.terminate();
this.worker = null;
}
if (!this.worker) {
this.worker = makeWorker(this.src);
this.worker.addEventListener(
"message",
(e: MessageEvent) => this.dispatch(e.data)
);
this.worker.addEventListener(
"error",
(e: ErrorEvent) => this.error(e)
);
}
this.worker.postMessage(x, tx);
}
}

done() {
super.done();
if (this.terminate > 0) {
setTimeout(() => {
DEBUG && console.log("terminating worker...");
this.worker.terminate();
}, this.terminate);
}
}
}
66 changes: 0 additions & 66 deletions packages/rstream/src/tunnel.ts

This file was deleted.

0 comments on commit f24e69e

Please sign in to comment.