Skip to content

Commit

Permalink
feat(par): Adds race and all to Task
Browse files Browse the repository at this point in the history
  • Loading branch information
kofno committed Jun 10, 2018
1 parent ccd371d commit d79a390
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions src/Task.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { resolve } from 'url';

export type Reject<E> = (err: E) => void;
export type Resolve<T> = (t: T) => void;
export type Cancel = () => void;
Expand Down Expand Up @@ -37,6 +39,68 @@ class Task<E, T> {
});
}

/**
* Creates a new task that will run a series of tasks in parallel. If any
* of the tasks reject, then all other results are discarded. If all tasks
* resolve, then the an array of results is returned.
*
* This is comparable to Promise.all
*
* Implementation is based on https://github.com/futurize/parallel-future
*/
public static all<E, T>(ts: Array<Task<E, T>>): Task<E, T[]> {
const length = ts.length;
if (length === 0) {
return Task.succeed([]);
}

return new Task((reject, resolve) => {
let resolved = 0;
const results: T[] = [];
const resolveIdx = (idx: number) => (result: T) => {
resolved = resolved + 1;
results[idx] = result;
if (resolved === length) {
resolve(results);
}
};
for (let i = 0; i < length; i++) {
ts[i].fork(reject, resolveIdx(i));
}
return noop;
});
}

/**
* Creates a new Task from an Array of Tasks. When forked, all tasks are
* forked in parallel. The first task to complete (either rejected or resolved)
* is preserved. All other results are discarded.
*
* This could be used for a simple timeout mechanism. If the timeout rejects
* before the fetch completes, you'll get a timeout error.
*
* new Task([longFetchTask, timeoutTask])
*/
public static race<T, E>(ts: Array<Task<E, T>>): Task<E, T> {
if (ts.length === 0) {
return new Task((reject, resolve) => noop);
}

return new Task((reject, resolve) => {
let resolved = false;
const resolveIf = (result: T) => {
if (!resolved) {
resolved = true;
resolve(result);
}
};
for (let i = 0; i < ts.length; i++) {
ts[i].fork(reject, resolveIf);
}
return noop;
});
}

private fn: Computation<E, T>;

constructor(computation: Computation<E, T>) {
Expand Down

0 comments on commit d79a390

Please sign in to comment.