diff --git a/packages/rstream/src/from/atom.ts b/packages/rstream/src/from/atom.ts index 926b3d7847..8ba5584303 100644 --- a/packages/rstream/src/from/atom.ts +++ b/packages/rstream/src/from/atom.ts @@ -32,7 +32,7 @@ export interface FromAtomOpts extends CommonOpts { * emitted on the stream. If `emitFirst` is true (default), also emits * atom's current value when first subscriber attaches to stream. * - * Also see {@link fromView} + * Also see {@link fromView}, {@link fromViewUnsafe} * * @example * ```ts @@ -59,7 +59,7 @@ export const fromAtom = ( opts = optsWithID("atom", >{ emitFirst: true, changed: (a, b) => a !== b, - ...opts + ...opts, }); return new Stream((stream) => { atom.addWatch(stream.id, (_, prev, curr) => { diff --git a/packages/rstream/src/from/view.ts b/packages/rstream/src/from/view.ts index 168f78216c..4dbf45a0ac 100644 --- a/packages/rstream/src/from/view.ts +++ b/packages/rstream/src/from/view.ts @@ -1,16 +1,40 @@ import { View } from "@thi.ng/atom"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; -import type { Fn, Path, Predicate2 } from "@thi.ng/api"; +import type { + DeepPath, + Fn, + Path, + Path0, + Path1, + Path2, + Path3, + Path4, + Path5, + Path6, + Path7, + Path8, + PathVal1, + PathVal2, + PathVal3, + PathVal4, + PathVal5, + PathVal6, + PathVal7, + PathVal8, + Predicate2, +} from "@thi.ng/api"; import type { ReadonlyAtom } from "@thi.ng/atom"; import type { CommonOpts } from "../api"; -export interface FromViewOpts extends Partial { - path: Path; - tx?: Fn; - equiv?: Predicate2; +export interface FromViewOpts extends Partial { + path: P; + tx?: Fn; + equiv?: Predicate2; } +export type FromViewUnsafeOpts = FromViewOpts; + /** * Similar to {@link fromAtom}, but creates an eager derived view for a * nested value in atom / cursor and yields stream of its value changes. @@ -18,30 +42,34 @@ export interface FromViewOpts extends Partial { * @remarks * Views are readonly and more lightweight versions of * {@link @thi.ng/atom#Cursor | cursors}. The view checks for value - * changes with given `equiv` predicate ({@link @thi.ng/equiv#equiv} by - * default). If the predicate returns a falsy result, the new value is - * emitted on the stream. The first value emitted is always the - * (possibly transformed) current value at the stream's start time (i.e. - * when the first subscriber attaches). + * changes with given `equiv` predicate (default: + * {@link @thi.ng/equiv#equiv}). If the predicate returns a falsy result + * (i.e. the new value), the new value is emitted on the stream. The + * first value emitted is always the (possibly transformed) current + * value at the stream's start time (i.e. when the first subscriber + * attaches). * * If the `tx` option is given, the raw value is first passed to this * transformer function and its result emitted on the stream instead. * * When the stream is cancelled the view is destroyed as well. * + * Also see {@link @thi.ng/atom#defView}, {@link @thi.ng/atom#defViewUnsafe} + * * @example * ```ts - * db = new Atom({ a: 1, b: { c: 2 }}); + * const db = defAtom({ a: 1, b: { c: 2 }}); * - * fromView( + * fromViewUnsafe( * db, * { * path: "b.c", - * tx: (x) => x != null ? x : "n/a" - * }).subscribe(trace("view:")) + * tx: (x) => x != null ? String(x) : "n/a" + * } + * ).subscribe(trace("view:")) * // view: 2 * - * db.swapIn("b.c", (x: number) => x + 1); + * db.swapIn(["b","c"], (x: number) => x + 1); * // view: 3 * * db.reset({ a: 10 }); @@ -51,15 +79,86 @@ export interface FromViewOpts extends Partial { * @param atom - * @param opts - */ -export const fromView = ( +export const fromViewUnsafe = ( + atom: ReadonlyAtom, + opts: FromViewUnsafeOpts +): Stream => fromView(atom, opts); + +/** + * Type checked version of {@link fromViewUnsafe}. Only the first 8 path + * levels are type checked. + * + * @remarks + * Stream value type is inferred from target path or (if given), the + * result type of the optional view transformer (`tx` option). + * + * Also see {@link @thi.ng/atom#defView}, + * {@link @thi.ng/atom#defViewUnsafe} + * + * @param parent - + * @param opts - + */ +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts +): Stream; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts, PathVal1, R> +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts, PathVal2, R> +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts, PathVal3, R> +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts, PathVal4, R> +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts, PathVal5, R> +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts< + Path6, + PathVal6, + R + > +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts< + Path7, + PathVal7, + R + > +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts< + Path8, + PathVal8, + R + > +): Stream : R>; +export function fromView( + parent: ReadonlyAtom, + opts: FromViewOpts, any, R> +): Stream; +export function fromView( atom: ReadonlyAtom, - opts: FromViewOpts -): Stream => { - opts = >optsWithID("view", opts); - return new Stream((stream) => { + opts: FromViewOpts +): Stream { + opts = >optsWithID("view", opts); + return new Stream((stream) => { let isActive = true; const tx = opts.tx; - const view = new View( + const view = new View( atom, opts.path, tx @@ -73,4 +172,4 @@ export const fromView = ( view.release(); }; }); -}; +} diff --git a/packages/rstream/test/stream-sync.ts b/packages/rstream/test/stream-sync.ts index 6c99395ff0..da9e2a3e74 100644 --- a/packages/rstream/test/stream-sync.ts +++ b/packages/rstream/test/stream-sync.ts @@ -7,6 +7,7 @@ import { take } from "@thi.ng/transducers"; import * as assert from "assert"; +import { TIMEOUT } from "./config"; import { CloseMode, fromInterval, @@ -16,9 +17,8 @@ import { State, stream, sync, - transduce + transduce, } from "../src"; -import { TIMEOUT } from "./config"; describe("StreamSync", () => { function adder() { @@ -38,14 +38,14 @@ describe("StreamSync", () => { let a1buf, a2buf; const db = new Atom({ a1: { ins: { a: 1, b: 2 } }, - a2: { ins: { b: 10 } } + a2: { ins: { b: 10 } }, }); const a1 = sync({ src: [ - (a = fromView(db, { path: "a1.ins.a" })), - (b = fromView(db, { path: "a1.ins.b" })) + (a = fromView(db, { path: ["a1", "ins", "a"] })), + (b = fromView(db, { path: ["a1", "ins", "b"] })), ], - xform: adder() + xform: adder(), }); const a1res = a1.subscribe({ next(x) { @@ -53,11 +53,11 @@ describe("StreamSync", () => { }, done() { a1done = true; - } + }, }); const a2 = sync({ - src: [a1, (c = fromView(db, { path: "a2.ins.b" }))], - xform: adder() + src: [a1, (c = fromView(db, { path: ["a2", "ins", "b"] }))], + xform: adder(), }); const res = a2.subscribe({ next(x) { @@ -65,7 +65,7 @@ describe("StreamSync", () => { }, done() { a2done = true; - } + }, }); assert.equal(a1buf, 3); assert.equal(a2buf, 13); @@ -97,7 +97,7 @@ describe("StreamSync", () => { const src = { a: stream(), b: stream(), - c: stream() + c: stream(), }; const res: any[] = []; const main = sync({ src, mergeOnly: true }).subscribe({ @@ -107,10 +107,10 @@ describe("StreamSync", () => { { c: 1 }, { c: 1, b: 2 }, { c: 1, b: 2, a: 3 }, - { c: 1, b: 2, a: 4 } + { c: 1, b: 2, a: 4 }, ]); done(); - } + }, }); src.c.next(1); @@ -124,12 +124,12 @@ describe("StreamSync", () => { const src = { a: stream(), b: stream(), - c: stream() + c: stream(), }; const res: any[] = []; const main = sync({ src, - mergeOnly: true + mergeOnly: true, }) .transform( // ensure `a` & `b` are present @@ -140,10 +140,10 @@ describe("StreamSync", () => { done: () => { assert.deepEqual(res, [ { c: 1, b: 2, a: 3 }, - { c: 1, b: 2, a: 4 } + { c: 1, b: 2, a: 4 }, ]); done(); - } + }, }); src.c.next(1); @@ -162,8 +162,8 @@ describe("StreamSync", () => { src: { t: fromInterval(5), a: fromPromise(delayed("aa", 20)), - b: fromPromise(delayed("bb", 40)) - } + b: fromPromise(delayed("bb", 40)), + }, }), comp( take(1), @@ -180,18 +180,18 @@ describe("StreamSync", () => { const main = sync({ src: [ fromIterable([1, 2, 3], { delay: TIMEOUT, id: "a" }), - fromIterable([1, 2, 3, 4], { delay: TIMEOUT, id: "b" }) + fromIterable([1, 2, 3, 4], { delay: TIMEOUT, id: "b" }), ], closeIn: CloseMode.NEVER, closeOut: CloseMode.NEVER, - reset: true + reset: true, }); const acc: any[] = []; const sub = main.subscribe({ next(x) { acc.push(x); - } + }, }); setTimeout(() => sub.unsubscribe(), 3.5 * TIMEOUT); @@ -200,7 +200,7 @@ describe("StreamSync", () => { assert.deepEqual(acc, [ { a: 1, b: 1 }, { a: 2, b: 2 }, - { a: 3, b: 3 } + { a: 3, b: 3 }, ]); done(); }, 5 * TIMEOUT);