Skip to content

Commit

Permalink
feat(rstream): add forkBuffer/joinBuffer HOFs, add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Sep 6, 2019
1 parent 05bdeec commit a35c8e8
Showing 1 changed file with 78 additions and 1 deletion.
79 changes: 78 additions & 1 deletion packages/rstream/src/forkjoin.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { Fn, Fn3 } from "@thi.ng/api";
import { comp, map, range } from "@thi.ng/transducers";
import {
comp,
map,
mapcat,
range
} from "@thi.ng/transducers";
import { CommonOpts, ITransformable } from "./api";
import { sync } from "./stream-sync";
import { tunnel } from "./subs/tunnel";
Expand Down Expand Up @@ -48,6 +53,10 @@ export interface ForkJoinOpts<IN, MSG, RES, OUT> extends Partial<CommonOpts> {
* `navigator.hardwareConcurrency` (or if unavailable, then 4 as
* fallback). If setting this higher, be aware of browser limits and
* potential resulting crashes!
*
* If using multiple `forkJoin`s concurrently, it's the user's
* responsibility to ensure that the total number of workers won't
* exceed the browser limit (Chome/FF ~20).
*/
numWorkers?: number;
/**
Expand All @@ -68,6 +77,9 @@ export interface ForkJoinOpts<IN, MSG, RES, OUT> extends Partial<CommonOpts> {
/**
* If given and greater than zero, all workers will be terminated
* after given period (in millis) after the parent stream is done.
* If used, this value MUST be higher than the expected processing
* time of the worker jobs, in order to guarantee that the last
* values are processed fully.
*
* Default: 1000
*/
Expand Down Expand Up @@ -129,3 +141,68 @@ export const forkJoin = <IN, MSG, RES, OUT>(
backPressure: opts.backPressure
});
};

/**
* Higher-order fork function for scenarios involving the split-parallel
* processing of a large buffer. The returned function is meant to be
* used as `fork` function in a `ForkJoinOpts` config and extracts a
* workload slice of the original buffer for a single worker. The HOF
* itself takes a minimum chunk size as optional parameter (default: 1).
*
* **Note:** Depending on the configured `minChunkSize` and the size of
* the input buffer to be partitioned, the returned fork function might
* produce empty sub-arrays for some workers, iff the configured number
* of workers exceeds the resulting number of chunks / input values.
* E.g. If the number of workers = 8, buffer size = 10 and min chunk
* size = 2, then the last 3 (i.e. 8 - 10 / 2) workers will only receive
* empty workloads.
*
* More generally, if the input buffer size is not equally divisible
* over the given number of workers, the last worker might receive a
* larger or smaller chunk.
*
* ```
* forkJoin<number[], number[], number[], number[]>({
* src,
* // job definition / split buffer into chunks (min size 256 values)
* fork: forkBuffer(256),
* // re-join partial results
* join: joinBuffer(),
* worker: "./worker.js",
* })
* ```
*
* @see forkJoin
* @see joinBuffer
*
* @param minChunkSize
*/
export const forkBuffer = (minChunkSize = 1) => <T>(
id: number,
numWorkers: number,
buf: T[]
) => {
const chunkSize = Math.max(minChunkSize, (buf.length / numWorkers) | 0);
return id < numWorkers - 1
? buf.slice(id * chunkSize, (id + 1) * chunkSize)
: buf.slice(id * chunkSize);
};

/**
* Higher-order join function for scenarios involving the split-parallel
* processing of a large buffer. The returned function is meant to be
* used as `join` function in a `ForkJoinOpts` config, receives the
* processed result chunks from all workers and concatenates them back
* into a single result array.
*
* The optional `fn` arg can be used to pick the actual result chunk
* from each worker result. This is useful if the worker result type is
* not an array and includes other data points (e.g. execution metrics
* etc.). If `fn` is not given, it defaults to the identity function
* (i.e. each worker's result is assumed to be an array).
*
* @param fn
*/
export const joinBuffer = <A, B extends Array<any>>(
fn: Fn<A, B> = (x) => <any>x
) => (parts: A[]) => <B>[...mapcat(fn, parts)];

0 comments on commit a35c8e8

Please sign in to comment.