Skip to content

Commit

Permalink
feat(rstream): update args for various fromXXX() stream factories
Browse files Browse the repository at this point in the history
- add types for options objects
- update tests

BREAKING CHANGE: update stream factories to use options object args

- fromAtom
- fromInterval
- fromIterable
- fromView
- fromWorker
  • Loading branch information
postspectacular committed Nov 23, 2019
1 parent 898eb53 commit b466ebc
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 103 deletions.
63 changes: 41 additions & 22 deletions packages/rstream/src/from/atom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,37 @@ import { CommonOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";

export interface FromAtomOpts<T> extends CommonOpts {
/**
* True, if the current atom value should be emitted when the stream
* activates.
*
* @defaultValue true
*/
emitFirst: boolean;
/**
* User predicate to determine value changes in atom. New values are
* only emitted on stream if the predicate returns true.
*/
changed: Predicate2<T>;
}

/**
* Yields stream of value changes in given atom / cursor. Attaches watch
* to atom and checks for value changes with given `changed` predicate
* (`!==` by default). If the predicate returns truthy result, the new
* value is emitted on the stream. If `emitFirst` is true (default),
* also emits atom's current value when first subscriber attaches to
* stream.
* Yields stream of value changes in given
* {@link @thi.ng/atom# | Atom-like state container}.
*
* See:
* - fromView()
* - @thi.ng/atom
* @remarks
* Attaches a watch to the atom and checks for value changes with given
* `changed` predicate (`!==` by default). If the predicate returns
* truthy result, the new value is emitted on the stream. If `emitFirst`
* is true (default), also emits atom's current value when first
* subscriber attaches to stream.
*
* ```
* db = new Atom({a: 23, b: 88});
* Also see {@link fromView}
*
* @example
* ```ts
* db = new Atom({ a: 23, b: 88 });
* cursor = new Cursor(db, "a")
*
* rs.fromAtom(cursor).subscribe(rs.trace("cursor val:"))
Expand All @@ -31,22 +48,24 @@ import { optsWithID } from "../utils/idgen";
* ```
*
* @param atom
* @param emitFirst
* @param changed
* @param opts
*/
export const fromAtom = <T>(
atom: ReadonlyAtom<T>,
emitFirst = true,
changed?: Predicate2<T>,
opts?: Partial<CommonOpts>
): Stream<T> =>
new Stream<T>((stream) => {
changed = changed || ((a, b) => a !== b);
opts?: Partial<FromAtomOpts<T>>
): Stream<T> => {
opts = optsWithID("atom", <FromAtomOpts<T>>{
emitFirst: true,
changed: (a, b) => a !== b,
...opts
});
return new Stream<T>((stream) => {
atom.addWatch(stream.id, (_, prev, curr) => {
if (changed!(prev, curr)) {
if (opts!.changed!(prev, curr)) {
stream.next(curr);
}
});
emitFirst && stream.next(atom.deref());
opts!.emitFirst && stream.next(atom.deref());
return () => atom.removeWatch(stream.id);
}, optsWithID("atom", opts));
}, opts);
};
31 changes: 22 additions & 9 deletions packages/rstream/src/from/interval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,35 @@ import { CloseMode, CommonOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";

export interface FromIntervalOpts extends CommonOpts {
/**
* If given, only the stated number of values will be emitted (in
* the `[0...num)` interval) and the stream will become inactive (or
* close) after.
*
* @defaultValue Infinity
*/
num: number;
}

/**
* Returns a new `Stream` which emits a monotonically increasing counter
* value at given `delay` interval, up to an optionally defined max
* value (default: ∞), after which the stream is closed. The stream only
* starts when the first subscriber becomes available.
* Returns a new `Stream` of monotonically increasing counter values,
* emitted at given `delay` interval and up to the optionally defined
* max value (default: ∞), after which the stream is closed. The stream
* only starts when the first subscriber becomes available.
*
* @param delay
* @param count
* @param opts
*/
export const fromInterval = (
delay: number,
count = Infinity,
opts?: Partial<CommonOpts>
) =>
new Stream<number>((stream) => {
opts?: Partial<FromIntervalOpts>
) => {
opts = optsWithID("interval", <FromIntervalOpts>{ num: Infinity, ...opts });
return new Stream<number>((stream) => {
let i = 0;
let count = opts!.num!;
stream.next(i++);
let id = setInterval(() => {
stream.next(i++);
Expand All @@ -28,4 +40,5 @@ export const fromInterval = (
}
}, delay);
return () => clearInterval(id);
}, optsWithID("interval", opts));
}, opts);
};
16 changes: 12 additions & 4 deletions packages/rstream/src/from/iterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ import { CloseMode, CommonOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";

export interface FromIterableOpts extends CommonOpts {
/**
* Time delay (in ms) between emitted values. The default value of
* 0, means as fast as possible (but still via `setInterval`).
*
* @defaultValue 0
*/
delay: number;
}

/**
* Creates a new `Stream` of given iterable which asynchronously calls
* `.next()` for each item of the iterable when the first (and in this
Expand All @@ -11,13 +21,11 @@ import { optsWithID } from "../utils/idgen";
* by default, but can be avoided by passing `false` as last argument.
*
* @param src
* @param delay
* @param opts
*/
export const fromIterable = <T>(
src: Iterable<T>,
delay = 0,
opts?: Partial<CommonOpts>
opts: Partial<FromIterableOpts> = {}
) =>
new Stream<T>((stream) => {
const iter = src[Symbol.iterator]();
Expand All @@ -29,7 +37,7 @@ export const fromIterable = <T>(
} else {
stream.next(val.value);
}
}, delay);
}, opts.delay || 0);
return () => clearInterval(id);
}, optsWithID("iterable", opts));

Expand Down
8 changes: 4 additions & 4 deletions packages/rstream/src/from/raf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import { optsWithID } from "../utils/idgen";
import { fromInterval } from "./interval";

/**
* Yields a stream of monotonically increasing counter, triggered by a
* `requestAnimationFrame()` loop (only available in browser
* Yields the stream of a monotonically increasing counter, triggered by
* a `requestAnimationFrame()` loop (only available in browser
* environments). In NodeJS, this function falls back to
* `fromInterval(16)`, yielding a similar (approximately 60fps) stream.
* `fromInterval(16)`, yielding a similar (approx. 60Hz) stream.
*
* Subscribers to this stream will be processed during that same loop
* iteration.
*/
export const fromRAF = (opts?: Partial<CommonOpts>) =>
isNode()
? fromInterval(16, undefined, opts)
? fromInterval(16, opts)
: new Stream<number>((stream) => {
let i = 0;
let isActive = true;
Expand Down
56 changes: 31 additions & 25 deletions packages/rstream/src/from/view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,38 @@ import { CommonOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";

export interface FromViewOpts<T> extends Partial<CommonOpts> {
path: Path;
tx?: ViewTransform<T>;
equiv?: Predicate2<any>;
}

/**
* Similar to `fromAtom()`, but creates an eager derived view for a
* Similar to {@link fromAtom}, but creates an eager derived view for a
* nested value in atom / cursor and yields stream of its value changes.
* Views are readonly versions of Cursors and more lightweight. The view
* checks for value changes with given `equiv` predicate
* (`@thi.ng/equiv` by default). If the predicate returns a falsy
* result, the new value is emitted on the stream. The first value
* emitted is always the (possibly transformed) current value at the
* stream's start time (i.e. when the first subscriber attaches).
*
* @remarks
* Views are readonly and more lightweight versions of
* {@link @thi.ng/atom#Cursor}s. The view checks for value changes with
* given `equiv` predicate ({@link @thi.ng/equiv#equiv} by default). If
* the predicate returns a falsy result, the new value is emitted on the
* stream. The first value emitted is always the (possibly transformed)
* current value at the stream's start time (i.e. when the first
* subscriber attaches).
*
* If the optional `tx` is given, the raw value is first passed to this
* transformer function and its result emitted on the stream.
*
* When the stream is cancelled the view is destroyed as well.
*
* See:
* - fromAtom()
* - @thi.ng/atom
*
* ```
* @example
* ```ts
* db = new Atom({a: 1, b: {c: 2}});
*
* fromView(db, "b.c", (x) => x != null ? x : "n/a").subscribe(trace("view:"))
* fromView(
* db,
* { path: "b.c", tx: (x) => x != null ? x : "n/a" }
* ).subscribe(trace("view:"))
* // view: 2
*
* db.swapIn("b.c", (x: number) => x + 1);
Expand All @@ -39,31 +48,28 @@ import { optsWithID } from "../utils/idgen";
* ```
*
* @param atom
* @param path
* @param tx
* @param equiv
* @param opts
*/
export const fromView = <T>(
atom: ReadonlyAtom<any>,
path: Path,
tx?: ViewTransform<T>,
equiv?: Predicate2<any>,
opts?: Partial<CommonOpts>
): Stream<T> =>
new Stream<T>((stream) => {
opts: FromViewOpts<T>
): Stream<T> => {
opts = <FromViewOpts<T>>optsWithID("view", opts);
return new Stream<T>((stream) => {
let isActive = true;
const tx = opts.tx;
const view = new View<T>(
atom,
path,
opts.path,
tx
? (x) => isActive && ((x = tx(x)), stream.next(x), x)
: (x) => isActive && (stream.next(x), x),
false,
equiv
opts.equiv
);
return () => {
isActive = false;
view.release();
};
}, optsWithID("view", opts));
});
};
29 changes: 21 additions & 8 deletions packages/rstream/src/from/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,45 @@ import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";
import { makeWorker } from "../utils/worker";

export interface FromWorkerOpts extends CommonOpts {
/**
* If true, the worker will be terminated when the stream
* is being closed.
*
* @defaultValue true
*/
terminate: boolean;
}

/**
* Returns a new `Stream` instance which adds "message" and "error"
* event listeners to given `worker` and then passes received values
* downstream. If `terminate` is true (default), the worker will be
* downstream.
*
* @remarks
* If `terminate` is true (default), the worker will be
* terminated when the stream is being closed (either directly or
* indirectly, i.e. if the user called `.done()` on the stream or the
* last child subscription has unsubscribed).
*
* As with `postWorker()`, the `worker` can be an existing `Worker`
* As with {@link postWorker}, the `worker` can be an existing `Worker`
* instance, a JS source code `Blob` or an URL string. In the latter two
* cases, a worker is created automatically using `utils/makeWorker()`.
*
* ```
* @example
* ```ts
*
* ```
*
* @param worker
* @param terminate
* @param opts
*/
export const fromWorker = <T>(
worker: Worker | Blob | string,
terminate = true,
opts?: Partial<CommonOpts>
opts?: Partial<FromWorkerOpts>
) => {
const _worker = makeWorker(worker);
opts = optsWithID("worker", opts);
return new Stream<T>((stream) => {
const ml = (e: MessageEvent) => {
stream.next(e.data);
Expand All @@ -41,10 +54,10 @@ export const fromWorker = <T>(
return () => {
_worker.removeEventListener("message", ml);
_worker.removeEventListener("error", <EventListener>el);
if (terminate) {
if (opts!.terminate !== false) {
LOGGER.info("terminating worker", _worker);
_worker.terminate();
}
};
}, optsWithID("worker", opts));
}, opts);
};
4 changes: 2 additions & 2 deletions packages/rstream/src/subs/transduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Subscription } from "../subscription";
/**
* Returns a promise which subscribes to given input and transforms
* incoming values using given transducer `xform` and reducer `rfn`.
* Once the input is done the promise will resolve with the final
* reduced result (or fail with error).
* Once the input or the reducer is done, the promise will resolve with
* the final reduced result (or fail with error).
*
* ```
* rs.transduce(
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import { optsWithID } from "./utils/idgen";
export function trigger(): Stream<boolean>;
export function trigger<T>(x: T): Stream<T>;
export function trigger(x: any = true) {
return fromIterable([x], 0, optsWithID("trigger"));
return fromIterable([x], optsWithID("trigger"));
}
7 changes: 6 additions & 1 deletion packages/rstream/src/tween.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ export const tween = <T>(
},
closeIn: CloseMode.FIRST
}).transform(
scan(reducer(() => initial, (acc, { src }) => mix(acc, src))),
scan(
reducer(
() => initial,
(acc, { src }) => mix(acc, src)
)
),
dedupe(stop || (() => false))
);

Expand Down
Loading

0 comments on commit b466ebc

Please sign in to comment.