diff --git a/packages/rstream/src/forkjoin.ts b/packages/rstream/src/forkjoin.ts index 3821dbdc7e..1bcfccda63 100644 --- a/packages/rstream/src/forkjoin.ts +++ b/packages/rstream/src/forkjoin.ts @@ -1,5 +1,10 @@ import { Fn, Fn3 } from "@thi.ng/api"; -import { comp, map, range } from "@thi.ng/transducers"; +import { + comp, + map, + mapcat, + range +} from "@thi.ng/transducers"; import { CommonOpts, ITransformable } from "./api"; import { sync } from "./stream-sync"; import { tunnel } from "./subs/tunnel"; @@ -48,6 +53,10 @@ export interface ForkJoinOpts extends Partial { * `navigator.hardwareConcurrency` (or if unavailable, then 4 as * fallback). If setting this higher, be aware of browser limits and * potential resulting crashes! + * + * If using multiple `forkJoin`s concurrently, it's the user's + * responsibility to ensure that the total number of workers won't + * exceed the browser limit (Chome/FF ~20). */ numWorkers?: number; /** @@ -68,6 +77,9 @@ export interface ForkJoinOpts extends Partial { /** * If given and greater than zero, all workers will be terminated * after given period (in millis) after the parent stream is done. + * If used, this value MUST be higher than the expected processing + * time of the worker jobs, in order to guarantee that the last + * values are processed fully. * * Default: 1000 */ @@ -129,3 +141,68 @@ export const forkJoin = ( backPressure: opts.backPressure }); }; + +/** + * Higher-order fork function for scenarios involving the split-parallel + * processing of a large buffer. The returned function is meant to be + * used as `fork` function in a `ForkJoinOpts` config and extracts a + * workload slice of the original buffer for a single worker. The HOF + * itself takes a minimum chunk size as optional parameter (default: 1). + * + * **Note:** Depending on the configured `minChunkSize` and the size of + * the input buffer to be partitioned, the returned fork function might + * produce empty sub-arrays for some workers, iff the configured number + * of workers exceeds the resulting number of chunks / input values. + * E.g. If the number of workers = 8, buffer size = 10 and min chunk + * size = 2, then the last 3 (i.e. 8 - 10 / 2) workers will only receive + * empty workloads. + * + * More generally, if the input buffer size is not equally divisible + * over the given number of workers, the last worker might receive a + * larger or smaller chunk. + * + * ``` + * forkJoin({ + * src, + * // job definition / split buffer into chunks (min size 256 values) + * fork: forkBuffer(256), + * // re-join partial results + * join: joinBuffer(), + * worker: "./worker.js", + * }) + * ``` + * + * @see forkJoin + * @see joinBuffer + * + * @param minChunkSize + */ +export const forkBuffer = (minChunkSize = 1) => ( + id: number, + numWorkers: number, + buf: T[] +) => { + const chunkSize = Math.max(minChunkSize, (buf.length / numWorkers) | 0); + return id < numWorkers - 1 + ? buf.slice(id * chunkSize, (id + 1) * chunkSize) + : buf.slice(id * chunkSize); +}; + +/** + * Higher-order join function for scenarios involving the split-parallel + * processing of a large buffer. The returned function is meant to be + * used as `join` function in a `ForkJoinOpts` config, receives the + * processed result chunks from all workers and concatenates them back + * into a single result array. + * + * The optional `fn` arg can be used to pick the actual result chunk + * from each worker result. This is useful if the worker result type is + * not an array and includes other data points (e.g. execution metrics + * etc.). If `fn` is not given, it defaults to the identity function + * (i.e. each worker's result is assumed to be an array). + * + * @param fn + */ +export const joinBuffer = >( + fn: Fn = (x) => x +) => (parts: A[]) => [...mapcat(fn, parts)];