diff --git a/packages/rstream/src/from/atom.ts b/packages/rstream/src/from/atom.ts index 49c5251277..3da3539c49 100644 --- a/packages/rstream/src/from/atom.ts +++ b/packages/rstream/src/from/atom.ts @@ -4,20 +4,37 @@ import { CommonOpts } from "../api"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; +export interface FromAtomOpts extends CommonOpts { + /** + * True, if the current atom value should be emitted when the stream + * activates. + * + * @defaultValue true + */ + emitFirst: boolean; + /** + * User predicate to determine value changes in atom. New values are + * only emitted on stream if the predicate returns true. + */ + changed: Predicate2; +} + /** - * Yields stream of value changes in given atom / cursor. Attaches watch - * to atom and checks for value changes with given `changed` predicate - * (`!==` by default). If the predicate returns truthy result, the new - * value is emitted on the stream. If `emitFirst` is true (default), - * also emits atom's current value when first subscriber attaches to - * stream. + * Yields stream of value changes in given + * {@link @thi.ng/atom# | Atom-like state container}. * - * See: - * - fromView() - * - @thi.ng/atom + * @remarks + * Attaches a watch to the atom and checks for value changes with given + * `changed` predicate (`!==` by default). If the predicate returns + * truthy result, the new value is emitted on the stream. If `emitFirst` + * is true (default), also emits atom's current value when first + * subscriber attaches to stream. * - * ``` - * db = new Atom({a: 23, b: 88}); + * Also see {@link fromView} + * + * @example + * ```ts + * db = new Atom({ a: 23, b: 88 }); * cursor = new Cursor(db, "a") * * rs.fromAtom(cursor).subscribe(rs.trace("cursor val:")) @@ -31,22 +48,24 @@ import { optsWithID } from "../utils/idgen"; * ``` * * @param atom - * @param emitFirst - * @param changed + * @param opts */ export const fromAtom = ( atom: ReadonlyAtom, - emitFirst = true, - changed?: Predicate2, - opts?: Partial -): Stream => - new Stream((stream) => { - changed = changed || ((a, b) => a !== b); + opts?: Partial> +): Stream => { + opts = optsWithID("atom", >{ + emitFirst: true, + changed: (a, b) => a !== b, + ...opts + }); + return new Stream((stream) => { atom.addWatch(stream.id, (_, prev, curr) => { - if (changed!(prev, curr)) { + if (opts!.changed!(prev, curr)) { stream.next(curr); } }); - emitFirst && stream.next(atom.deref()); + opts!.emitFirst && stream.next(atom.deref()); return () => atom.removeWatch(stream.id); - }, optsWithID("atom", opts)); + }, opts); +}; diff --git a/packages/rstream/src/from/interval.ts b/packages/rstream/src/from/interval.ts index 120204a9f1..e2a02ce634 100644 --- a/packages/rstream/src/from/interval.ts +++ b/packages/rstream/src/from/interval.ts @@ -2,11 +2,22 @@ import { CloseMode, CommonOpts } from "../api"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; +export interface FromIntervalOpts extends CommonOpts { + /** + * If given, only the stated number of values will be emitted (in + * the `[0...num)` interval) and the stream will become inactive (or + * close) after. + * + * @defaultValue Infinity + */ + num: number; +} + /** - * Returns a new `Stream` which emits a monotonically increasing counter - * value at given `delay` interval, up to an optionally defined max - * value (default: ∞), after which the stream is closed. The stream only - * starts when the first subscriber becomes available. + * Returns a new `Stream` of monotonically increasing counter values, + * emitted at given `delay` interval and up to the optionally defined + * max value (default: ∞), after which the stream is closed. The stream + * only starts when the first subscriber becomes available. * * @param delay * @param count @@ -14,11 +25,12 @@ import { optsWithID } from "../utils/idgen"; */ export const fromInterval = ( delay: number, - count = Infinity, - opts?: Partial -) => - new Stream((stream) => { + opts?: Partial +) => { + opts = optsWithID("interval", { num: Infinity, ...opts }); + return new Stream((stream) => { let i = 0; + let count = opts!.num!; stream.next(i++); let id = setInterval(() => { stream.next(i++); @@ -28,4 +40,5 @@ export const fromInterval = ( } }, delay); return () => clearInterval(id); - }, optsWithID("interval", opts)); + }, opts); +}; diff --git a/packages/rstream/src/from/iterable.ts b/packages/rstream/src/from/iterable.ts index 8e77f24b2a..ae75dbea22 100644 --- a/packages/rstream/src/from/iterable.ts +++ b/packages/rstream/src/from/iterable.ts @@ -2,6 +2,16 @@ import { CloseMode, CommonOpts } from "../api"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; +export interface FromIterableOpts extends CommonOpts { + /** + * Time delay (in ms) between emitted values. The default value of + * 0, means as fast as possible (but still via `setInterval`). + * + * @defaultValue 0 + */ + delay: number; +} + /** * Creates a new `Stream` of given iterable which asynchronously calls * `.next()` for each item of the iterable when the first (and in this @@ -11,13 +21,11 @@ import { optsWithID } from "../utils/idgen"; * by default, but can be avoided by passing `false` as last argument. * * @param src - * @param delay * @param opts */ export const fromIterable = ( src: Iterable, - delay = 0, - opts?: Partial + opts: Partial = {} ) => new Stream((stream) => { const iter = src[Symbol.iterator](); @@ -29,7 +37,7 @@ export const fromIterable = ( } else { stream.next(val.value); } - }, delay); + }, opts.delay || 0); return () => clearInterval(id); }, optsWithID("iterable", opts)); diff --git a/packages/rstream/src/from/raf.ts b/packages/rstream/src/from/raf.ts index e8d1fa4c85..2e4859b4b7 100644 --- a/packages/rstream/src/from/raf.ts +++ b/packages/rstream/src/from/raf.ts @@ -5,17 +5,17 @@ import { optsWithID } from "../utils/idgen"; import { fromInterval } from "./interval"; /** - * Yields a stream of monotonically increasing counter, triggered by a - * `requestAnimationFrame()` loop (only available in browser + * Yields the stream of a monotonically increasing counter, triggered by + * a `requestAnimationFrame()` loop (only available in browser * environments). In NodeJS, this function falls back to - * `fromInterval(16)`, yielding a similar (approximately 60fps) stream. + * `fromInterval(16)`, yielding a similar (approx. 60Hz) stream. * * Subscribers to this stream will be processed during that same loop * iteration. */ export const fromRAF = (opts?: Partial) => isNode() - ? fromInterval(16, undefined, opts) + ? fromInterval(16, opts) : new Stream((stream) => { let i = 0; let isActive = true; diff --git a/packages/rstream/src/from/view.ts b/packages/rstream/src/from/view.ts index 974245497b..7816226203 100644 --- a/packages/rstream/src/from/view.ts +++ b/packages/rstream/src/from/view.ts @@ -6,29 +6,38 @@ import { CommonOpts } from "../api"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; +export interface FromViewOpts extends Partial { + path: Path; + tx?: ViewTransform; + equiv?: Predicate2; +} + /** - * Similar to `fromAtom()`, but creates an eager derived view for a + * Similar to {@link fromAtom}, but creates an eager derived view for a * nested value in atom / cursor and yields stream of its value changes. - * Views are readonly versions of Cursors and more lightweight. The view - * checks for value changes with given `equiv` predicate - * (`@thi.ng/equiv` by default). If the predicate returns a falsy - * result, the new value is emitted on the stream. The first value - * emitted is always the (possibly transformed) current value at the - * stream's start time (i.e. when the first subscriber attaches). + * + * @remarks + * Views are readonly and more lightweight versions of + * {@link @thi.ng/atom#Cursor}s. The view checks for value changes with + * given `equiv` predicate ({@link @thi.ng/equiv#equiv} by default). If + * the predicate returns a falsy result, the new value is emitted on the + * stream. The first value emitted is always the (possibly transformed) + * current value at the stream's start time (i.e. when the first + * subscriber attaches). * * If the optional `tx` is given, the raw value is first passed to this * transformer function and its result emitted on the stream. * * When the stream is cancelled the view is destroyed as well. * - * See: - * - fromAtom() - * - @thi.ng/atom - * - * ``` + * @example + * ```ts * db = new Atom({a: 1, b: {c: 2}}); * - * fromView(db, "b.c", (x) => x != null ? x : "n/a").subscribe(trace("view:")) + * fromView( + * db, + * { path: "b.c", tx: (x) => x != null ? x : "n/a" } + * ).subscribe(trace("view:")) * // view: 2 * * db.swapIn("b.c", (x: number) => x + 1); @@ -39,31 +48,28 @@ import { optsWithID } from "../utils/idgen"; * ``` * * @param atom - * @param path - * @param tx - * @param equiv * @param opts */ export const fromView = ( atom: ReadonlyAtom, - path: Path, - tx?: ViewTransform, - equiv?: Predicate2, - opts?: Partial -): Stream => - new Stream((stream) => { + opts: FromViewOpts +): Stream => { + opts = >optsWithID("view", opts); + return new Stream((stream) => { let isActive = true; + const tx = opts.tx; const view = new View( atom, - path, + opts.path, tx ? (x) => isActive && ((x = tx(x)), stream.next(x), x) : (x) => isActive && (stream.next(x), x), false, - equiv + opts.equiv ); return () => { isActive = false; view.release(); }; - }, optsWithID("view", opts)); + }); +}; diff --git a/packages/rstream/src/from/worker.ts b/packages/rstream/src/from/worker.ts index 1d9507cab6..6f7e008c24 100644 --- a/packages/rstream/src/from/worker.ts +++ b/packages/rstream/src/from/worker.ts @@ -3,32 +3,45 @@ import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; import { makeWorker } from "../utils/worker"; +export interface FromWorkerOpts extends CommonOpts { + /** + * If true, the worker will be terminated when the stream + * is being closed. + * + * @defaultValue true + */ + terminate: boolean; +} + /** * Returns a new `Stream` instance which adds "message" and "error" * event listeners to given `worker` and then passes received values - * downstream. If `terminate` is true (default), the worker will be + * downstream. + * + * @remarks + * If `terminate` is true (default), the worker will be * terminated when the stream is being closed (either directly or * indirectly, i.e. if the user called `.done()` on the stream or the * last child subscription has unsubscribed). * - * As with `postWorker()`, the `worker` can be an existing `Worker` + * As with {@link postWorker}, the `worker` can be an existing `Worker` * instance, a JS source code `Blob` or an URL string. In the latter two * cases, a worker is created automatically using `utils/makeWorker()`. * - * ``` + * @example + * ```ts * * ``` * * @param worker - * @param terminate * @param opts */ export const fromWorker = ( worker: Worker | Blob | string, - terminate = true, - opts?: Partial + opts?: Partial ) => { const _worker = makeWorker(worker); + opts = optsWithID("worker", opts); return new Stream((stream) => { const ml = (e: MessageEvent) => { stream.next(e.data); @@ -41,10 +54,10 @@ export const fromWorker = ( return () => { _worker.removeEventListener("message", ml); _worker.removeEventListener("error", el); - if (terminate) { + if (opts!.terminate !== false) { LOGGER.info("terminating worker", _worker); _worker.terminate(); } }; - }, optsWithID("worker", opts)); + }, opts); }; diff --git a/packages/rstream/src/subs/transduce.ts b/packages/rstream/src/subs/transduce.ts index 5acad5e00c..2d450570f5 100644 --- a/packages/rstream/src/subs/transduce.ts +++ b/packages/rstream/src/subs/transduce.ts @@ -5,8 +5,8 @@ import { Subscription } from "../subscription"; /** * Returns a promise which subscribes to given input and transforms * incoming values using given transducer `xform` and reducer `rfn`. - * Once the input is done the promise will resolve with the final - * reduced result (or fail with error). + * Once the input or the reducer is done, the promise will resolve with + * the final reduced result (or fail with error). * * ``` * rs.transduce( diff --git a/packages/rstream/src/trigger.ts b/packages/rstream/src/trigger.ts index e44d9806a3..aee7e96f5b 100644 --- a/packages/rstream/src/trigger.ts +++ b/packages/rstream/src/trigger.ts @@ -10,5 +10,5 @@ import { optsWithID } from "./utils/idgen"; export function trigger(): Stream; export function trigger(x: T): Stream; export function trigger(x: any = true) { - return fromIterable([x], 0, optsWithID("trigger")); + return fromIterable([x], optsWithID("trigger")); } diff --git a/packages/rstream/src/tween.ts b/packages/rstream/src/tween.ts index 09f70d32d4..1ca9b372d0 100644 --- a/packages/rstream/src/tween.ts +++ b/packages/rstream/src/tween.ts @@ -75,7 +75,12 @@ export const tween = ( }, closeIn: CloseMode.FIRST }).transform( - scan(reducer(() => initial, (acc, { src }) => mix(acc, src))), + scan( + reducer( + () => initial, + (acc, { src }) => mix(acc, src) + ) + ), dedupe(stop || (() => false)) ); diff --git a/packages/rstream/test/from-atom.ts b/packages/rstream/test/from-atom.ts index a8f41c7216..81924f0607 100644 --- a/packages/rstream/test/from-atom.ts +++ b/packages/rstream/test/from-atom.ts @@ -7,7 +7,7 @@ describe("fromAtom", () => { it("works with atom", (done) => { let a = new Atom(0); - let src = fromAtom(a, false); + let src = fromAtom(a, { emitFirst:false }); let calledNext = false; src.subscribe({ next(x) { @@ -30,7 +30,7 @@ describe("fromAtom", () => { let state = { a: { b: {}, d: { e: 42 } } }; let a = new Atom(state); let c = new Cursor(a, "a.b.c"); - let src = fromAtom(c, false); + let src = fromAtom(c, { emitFirst: false }); let calledNext = false; src.subscribe({ next(x) { @@ -55,7 +55,7 @@ describe("fromAtom", () => { let a = new Atom({}); let c = new Cursor(a, "a.b"); let h = new History(c); - let src = fromAtom(h, true); + let src = fromAtom(h); let buf: any[] = []; src.subscribe({ next(x) { buf.push(x); } }); h.reset(1); @@ -81,8 +81,8 @@ describe("fromAtom", () => { let h = new History(a); let c1 = new Cursor(a, "a.b"); let c2 = new Cursor(a, "c"); - let src1 = fromAtom(c1, true); - let src2 = fromAtom(c2, true); + let src1 = fromAtom(c1); + let src2 = fromAtom(c2); let buf1:any[] = []; let buf2:any[] = []; src1.subscribe({ next(x) { buf1.push(x); } }); diff --git a/packages/rstream/test/from-iterable.ts b/packages/rstream/test/from-iterable.ts index 0dce82dd5e..2ff3f476b0 100644 --- a/packages/rstream/test/from-iterable.ts +++ b/packages/rstream/test/from-iterable.ts @@ -54,7 +54,7 @@ describe("fromIterable()", () => { it("works with delay", (done) => { let buf: any[] = []; let t0 = Date.now(); - src = fromIterable(data, 10); + src = fromIterable(data, { delay: 10 }); src.subscribe({ next(x) { buf.push(x); @@ -71,7 +71,7 @@ describe("fromIterable()", () => { this.timeout(TIMEOUT * 5); let buf: any[] = []; let doneCalled = false; - src = fromIterable(data, TIMEOUT); + src = fromIterable(data, { delay: TIMEOUT }); src.subscribe({ next(x) { buf.push(x); diff --git a/packages/rstream/test/pubsub.ts b/packages/rstream/test/pubsub.ts index d776d94221..72b6dd3200 100644 --- a/packages/rstream/test/pubsub.ts +++ b/packages/rstream/test/pubsub.ts @@ -48,7 +48,13 @@ describe("PubSub", () => { assert.deepEqual( [...acc], [ - [["a", 0], [["a", 0], ["a", 0]]], + [ + ["a", 0], + [ + ["a", 0], + ["a", 0] + ] + ], [["a", 1], [["a", 1]]], [["b", 2], [["b", 2]]] ] @@ -68,7 +74,10 @@ describe("PubSub", () => { fromIterableSync("abcbd").subscribe(pub); assert.deepEqual(acc, { a: [["a", 0]], - b: [["b", 1], ["b", 3]], + b: [ + ["b", 1], + ["b", 3] + ], c: [], d: [] }); @@ -86,7 +95,7 @@ describe("PubSub", () => { pub = pubsub({ topic: (x) => x }); pub.subscribeTopic("a", collect); const b = pub.subscribeTopic("b", collect); - fromIterable("abcbd", TIMEOUT).subscribe(pub); + fromIterable("abcbd", { delay: TIMEOUT }).subscribe(pub); setTimeout(() => { pub.unsubscribeTopic("b", b); }, TIMEOUT * 2.5); diff --git a/packages/rstream/test/stream-merge.ts b/packages/rstream/test/stream-merge.ts index 234de61fd6..31a2b6d046 100644 --- a/packages/rstream/test/stream-merge.ts +++ b/packages/rstream/test/stream-merge.ts @@ -19,7 +19,10 @@ describe("StreamMerge", () => { buf.push(x); }, done() { - assert.deepEqual(buf.sort((a, b) => a - b), expected); + assert.deepEqual( + buf.sort((a, b) => a - b), + expected + ); done(); } }; @@ -41,8 +44,8 @@ describe("StreamMerge", () => { it("merges dynamic inputs", (done) => { src = merge(); - src.add(fromIterable([1, 2, 3, 4], 10)); - src.add(fromIterable([10, 20], 5)); + src.add(fromIterable([1, 2, 3, 4], { delay: 10 })); + src.add(fromIterable([10, 20], { delay: 5 })); src.subscribe(check([1, 2, 3, 4, 10, 20], done)); }); @@ -73,9 +76,10 @@ describe("StreamMerge", () => { }); it("transducer streams", (done) => { - const sources = [fromIterable([1, 2, 3]), fromIterable([4, 5, 6])].map( - (s) => s.subscribe(map((x) => fromIterable([x, x, x]))) - ); + const sources = [ + fromIterable([1, 2, 3]), + fromIterable([4, 5, 6]) + ].map((s) => s.subscribe(map((x) => fromIterable([x, x, x])))); const main = merge({ src: sources }); const histogram = frequencies(); let acc: any = histogram[0](); @@ -86,7 +90,14 @@ describe("StreamMerge", () => { done() { assert.deepEqual( acc, - new Map([[1, 3], [2, 3], [3, 3], [4, 3], [5, 3], [6, 3]]) + new Map([ + [1, 3], + [2, 3], + [3, 3], + [4, 3], + [5, 3], + [6, 3] + ]) ); done(); } diff --git a/packages/rstream/test/stream-sync.ts b/packages/rstream/test/stream-sync.ts index eeefdea747..6c99395ff0 100644 --- a/packages/rstream/test/stream-sync.ts +++ b/packages/rstream/test/stream-sync.ts @@ -42,8 +42,8 @@ describe("StreamSync", () => { }); const a1 = sync({ src: [ - (a = fromView(db, "a1.ins.a")), - (b = fromView(db, "a1.ins.b")) + (a = fromView(db, { path: "a1.ins.a" })), + (b = fromView(db, { path: "a1.ins.b" })) ], xform: adder() }); @@ -56,7 +56,7 @@ describe("StreamSync", () => { } }); const a2 = sync({ - src: [a1, (c = fromView(db, "a2.ins.b"))], + src: [a1, (c = fromView(db, { path: "a2.ins.b" }))], xform: adder() }); const res = a2.subscribe({ @@ -165,7 +165,10 @@ describe("StreamSync", () => { b: fromPromise(delayed("bb", 40)) } }), - comp(take(1), map(({ a, b }: any) => ({ a, b }))), + comp( + take(1), + map(({ a, b }: any) => ({ a, b })) + ), last() ).then((res) => { assert.deepEqual(res, { a: "aa", b: "bb" }); @@ -176,8 +179,8 @@ describe("StreamSync", () => { it("never closes", (done) => { const main = sync({ src: [ - fromIterable([1, 2, 3], TIMEOUT, { id: "a" }), - fromIterable([1, 2, 3, 4], TIMEOUT, { id: "b" }) + fromIterable([1, 2, 3], { delay: TIMEOUT, id: "a" }), + fromIterable([1, 2, 3, 4], { delay: TIMEOUT, id: "b" }) ], closeIn: CloseMode.NEVER, closeOut: CloseMode.NEVER, diff --git a/packages/rstream/test/subscription.ts b/packages/rstream/test/subscription.ts index 4f6c2c9d31..aead03ba7f 100644 --- a/packages/rstream/test/subscription.ts +++ b/packages/rstream/test/subscription.ts @@ -11,7 +11,7 @@ describe("Subscription", () => { it("new sub receives last", function(done) { this.timeout(TIMEOUT * 5); let buf: any[] = []; - src = fromIterable([1, 2, 3], TIMEOUT); + src = fromIterable([1, 2, 3], { delay: TIMEOUT }); src.subscribe({ next(x) { buf.push(x); @@ -36,7 +36,7 @@ describe("Subscription", () => { this.timeout(TIMEOUT * 5); let buf: any[] = []; let called = false; - src = fromIterable([1, 2, 3], TIMEOUT); + src = fromIterable([1, 2, 3], { delay: TIMEOUT }); const sub = src.subscribe({ next(x) { buf.push(x); @@ -60,7 +60,7 @@ describe("Subscription", () => { let buf: any[] = []; let called = false; - src = fromIterable([1, 2, 3], TIMEOUT); + src = fromIterable([1, 2, 3], { delay: TIMEOUT }); const sub = src.subscribe( { next(x) { @@ -83,7 +83,7 @@ describe("Subscription", () => { it("completing transducer sends all values", (done) => { let buf: any[] = []; - src = fromIterable([1, 2, 3], 10); + src = fromIterable([1, 2, 3], { delay: 10 }); src.subscribe( { next(x) {