Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rstream] error handling, subscribe/transform cleanups #279

Merged
merged 37 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
22c6f7c
feat(rstream): add .transform() error handler opt (#276)
postspectacular Mar 5, 2021
123e15d
feat(rstream): add PubSub.transformTopic()
postspectacular Mar 5, 2021
aca9908
test(rstream): update tests
postspectacular Mar 5, 2021
fe0eaa9
feat(rstream): update ITransformable.transform()
postspectacular Mar 5, 2021
08adc5f
feat(rstream): add generic type for PubSub topics
postspectacular Mar 5, 2021
b59a054
docs(rstream): add/update docstrings
postspectacular Mar 5, 2021
98edee0
feat(rstream): add ISubscription interface
postspectacular Mar 5, 2021
90f19fc
test(rstream): update tests
postspectacular Mar 5, 2021
224f614
refactor(rstream-graph): update types to use ISubscription
postspectacular Mar 6, 2021
f299612
refactor(rstream-query): update types to use ISubscription
postspectacular Mar 6, 2021
9e290fe
feat(rstream): further simplify ISubscribable & impls
postspectacular Mar 6, 2021
009b83b
refactor(rstream-graph): update .subscribe() call sites
postspectacular Mar 6, 2021
0c7cc2d
minor(rstream-query): minor update
postspectacular Mar 6, 2021
fa87168
feat(rstream): update PubSub
postspectacular Mar 6, 2021
a101626
feat(rstream): add sidechainPartitionRAF()
postspectacular Mar 6, 2021
3e318f9
refactor(examples): update rstream examples
postspectacular Mar 6, 2021
fd5860f
docs(transducers): update docstrings
postspectacular Mar 6, 2021
1f11cdf
refactor(rdom): simplify $sub()
postspectacular Mar 6, 2021
f4001bd
docs(rstream): update readmes
postspectacular Mar 7, 2021
509520a
test(rstream): update pubsub test
postspectacular Mar 8, 2021
015380a
feat(rstream): update error handler sig (#281)
postspectacular Mar 9, 2021
de4149b
feat(rstream): add Sub2 WIP impl
postspectacular Mar 9, 2021
145cfe8
refactor(rdom-canvas): update to use ISubscription only
postspectacular Mar 9, 2021
c651421
fix(rstream): minor update/revert sub ctor args
postspectacular Mar 9, 2021
1a7f5c3
refactor(examples): prefer use ISubscription/ISubscriber
postspectacular Mar 9, 2021
db0ab34
feat(rstream): update Sub2, State enum
postspectacular Mar 10, 2021
a9e4040
feat(rstream): #281 update Subscription error/teardown logic
postspectacular Mar 10, 2021
1bd0702
test(rstream): update tests
postspectacular Mar 10, 2021
ae4866a
fix(rstream): fix wrong imports
postspectacular Mar 10, 2021
a8a8c44
feat(rstream): update DONE state & teardown logic
postspectacular Mar 11, 2021
73023b6
feat(rstream): add StreamSource error handling
postspectacular Mar 11, 2021
b3c86de
docs(rstream): add docstrings
postspectacular Mar 11, 2021
ea1d0c1
feat(rstream): update Subscription FSM, add/update tests
postspectacular Mar 12, 2021
9f16633
docs(rstream): update readme, pkg meta
postspectacular Mar 12, 2021
014bf20
perf(rstream): revert to storing child subs in array
postspectacular Mar 12, 2021
cca0f34
fix(rstream): PubSub dispatch & error handling
postspectacular Mar 12, 2021
ae591a1
fix(rstream): update failing tests
postspectacular Mar 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/adaptive-threshold/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export const defHandler = <E extends EventType>(
error: console.warn,
};
return xform
? eventProc.subscribeTopic(id, {}, {}).subscribe(sub, xform)
? eventProc.subscribeTopic(id, sub, { xform })
: eventProc.subscribeTopic(id, sub);
};

Expand Down
7 changes: 2 additions & 5 deletions examples/adaptive-threshold/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { peek } from "@thi.ng/arrays";
import type { PackedBuffer } from "@thi.ng/pixel";
import { fromRAF, sidechainPartition } from "@thi.ng/rstream";
import { sidechainPartitionRAF } from "@thi.ng/rstream";
import { map } from "@thi.ng/transducers";
import { updateDOM } from "@thi.ng/transducers-hdom";
import {
Expand Down Expand Up @@ -113,6 +112,4 @@ const app = (state: AppState) => {
// sidechain to buffer intra-frame state updates. then only passes the
// most recent one to `app()` and its resulting UI tree to the
// `updateDOM()` transducer
state
.subscribe(sidechainPartition<AppState, number>(fromRAF()))
.transform(map(peek), map(app), updateDOM());
sidechainPartitionRAF(state).transform(map(app), updateDOM());
4 changes: 2 additions & 2 deletions examples/bitmap-font/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { IObjectOf } from "@thi.ng/api";
import { dropdown } from "@thi.ng/hdom-components";
import { clamp } from "@thi.ng/math";
import { reactive, Stream, Subscription, sync } from "@thi.ng/rstream";
import { ISubscriber, reactive, Stream, sync } from "@thi.ng/rstream";
import {
comp,
map,
Expand All @@ -17,7 +17,7 @@ import { bits } from "@thi.ng/transducers-binary";
import { updateDOM } from "@thi.ng/transducers-hdom";
import { FONT } from "./font";

const emitOnStream = (stream: Subscription<any, any>) => (e: Event) =>
const emitOnStream = (stream: ISubscriber<any>) => (e: Event) =>
stream.next((<HTMLSelectElement>e.target).value);

// retrieve font bytes for given char
Expand Down
4 changes: 2 additions & 2 deletions examples/canvas-dial/src/dial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { peek } from "@thi.ng/arrays";
import { isString } from "@thi.ng/checks";
import { canvas2D } from "@thi.ng/hdom-components";
import { fitClamped } from "@thi.ng/math";
import type { Subscription } from "@thi.ng/rstream";
import type { ISubscription } from "@thi.ng/rstream";
import { GestureEvent, gestureStream } from "@thi.ng/rstream-gestures";
import { heading, sub2 } from "@thi.ng/vectors";

Expand Down Expand Up @@ -128,7 +128,7 @@ export const dial = (_opts: Partial<DialOpts>) => {
font: "10px sans-serif",
..._opts,
};
let events: Subscription<any, GestureEvent>;
let events: ISubscription<any, GestureEvent>;
let cx: number, cy: number;
const startTheta = opts.base + opts.gap / 2;

Expand Down
6 changes: 3 additions & 3 deletions examples/crypto-chart/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { resolve } from "@thi.ng/resolve-map";
import {
fromEvent,
fromInterval,
ISubscriber,
reactive,
resolve as resolvePromise,
stream,
Subscription,
sync,
trace,
} from "@thi.ng/rstream";
Expand Down Expand Up @@ -173,11 +173,11 @@ const API_URL = (market: string, symbol: string, period: number) =>
// stub for local testing
// const API_URL = (..._) => `ohlc.json`;

const emitOnStream = (stream: Subscription<any, any>) => (e: Event) =>
const emitOnStream = (stream: ISubscriber<any>) => (e: Event) =>
stream.next((<HTMLInputElement>e.target).value);

const menu = (
stream: Subscription<any, any>,
stream: ISubscriber<any>,
title: string,
items: DropDownOption[]
) =>
Expand Down
12 changes: 6 additions & 6 deletions examples/fft-synth/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ import { fit, fitClamped } from "@thi.ng/math";
import {
fromAtom,
fromDOMEvent,
fromRAF,
merge,
sidechainPartition,
sidechainPartitionRAF,
sync,
} from "@thi.ng/rstream";
import { gestureStream } from "@thi.ng/rstream-gestures";
Expand Down Expand Up @@ -82,7 +81,8 @@ const app = () => {
};
};

main.subscribe(sidechainPartition<any, number>(fromRAF())).transform(
map(app()),
updateDOM()
);
// subscription & transformation of app state stream. uses a RAF
// sidechain to buffer intra-frame state updates. then only passes the
// most recent one to `app()` and its resulting UI tree to the
// `updateDOM()` transducer
sidechainPartitionRAF(main).transform(map(app()), updateDOM());
16 changes: 9 additions & 7 deletions examples/hdom-benchmark/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ const fpsCounter = (
);
},
},
// stream transducer to compute the windowed moving avarage
comp(
benchmark(),
movingAverage(period),
map((x) => 1000 / x),
partition(width, 1, true)
)
{
// stream transducer to compute the windowed moving avarage
xform: comp(
benchmark(),
movingAverage(period),
map((x) => 1000 / x),
partition(width, 1, true)
),
}
);
return [
{
Expand Down
4 changes: 2 additions & 2 deletions examples/hdom-canvas-shapes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { COMMENT, serialize } from "@thi.ng/hiccup";
import { convertTree, svg } from "@thi.ng/hiccup-svg";
import { sincos } from "@thi.ng/math";
import { concat, skewX23, translation23 } from "@thi.ng/matrices";
import { fromRAF, stream, Subscription, sync } from "@thi.ng/rstream";
import { fromRAF, ISubscriber, stream, sync } from "@thi.ng/rstream";
import { map, range, repeatedly } from "@thi.ng/transducers";
import { updateDOM } from "@thi.ng/transducers-hdom";
import { addN } from "@thi.ng/vectors";
Expand Down Expand Up @@ -297,7 +297,7 @@ const TESTS: any = {
};

// test case selection dropdown
const choices = (_: any, target: Subscription<string, any>, id: string) => [
const choices = (_: any, target: ISubscriber<string>, id: string) => [
dropdown,
{
class: "w4 ma2",
Expand Down
15 changes: 6 additions & 9 deletions examples/imgui/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import { setInManyUnsafe } from "@thi.ng/paths";
import {
fromAtom,
fromDOMEvent,
fromRAF,
merge,
sidechainPartition,
sidechainPartitionRAF,
sync,
} from "@thi.ng/rstream";
import { gestureStream } from "@thi.ng/rstream-gestures";
Expand Down Expand Up @@ -552,10 +551,8 @@ const main = sync({
},
});

// transform the stream:
main
// group potentially higher frequency event updates & sync with RAF
// to avoid extraneous real DOM/Canvas updates
.subscribe(sidechainPartition<any, number>(fromRAF()))
// then apply main compoment function & apply hdom
.transform(map(app()), updateDOM());
// subscription & transformation of app state stream. uses a RAF
// sidechain to buffer intra-frame state updates. then only passes the
// most recent one to `app()` and its resulting UI tree to the
// `updateDOM()` transducer
sidechainPartitionRAF(main).transform(map(app()), updateDOM());
7 changes: 3 additions & 4 deletions examples/rdom-lissajous/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import { $canvas } from "@thi.ng/rdom-canvas";
import {
fromDOMEvent,
fromRAF,
ISubscribable,
ISubscription,
reactive,
Subscription,
sync,
} from "@thi.ng/rstream";
import { map, slidingWindow } from "@thi.ng/transducers";

const slider = (
dest: Subscription<number, number>,
dest: ISubscription<number, number>,
desc: string,
tooltip: string,
attribs?: any
Expand Down Expand Up @@ -60,7 +59,7 @@ size.next(<any>null);

// combine various reactive parameters
// and transform via transducers
const dots: ISubscribable<any[]> = sync({
const dots: ISubscription<any, any[]> = sync({
src: { a, b, scale, size, time: fromRAF() },
}).transform(
// compute next lissajous point
Expand Down
4 changes: 2 additions & 2 deletions examples/rdom-search-docs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { timed } from "@thi.ng/bench";
import { anchor, div, inputText } from "@thi.ng/hiccup-html";
import { $compile, $list, $text, Component, IComponent } from "@thi.ng/rdom";
import { debounce, reactive, Stream, Subscription } from "@thi.ng/rstream";
import { debounce, ISubscription, reactive, Stream } from "@thi.ng/rstream";
import { map } from "@thi.ng/transducers";
// @ts-ignore
import { deserialize } from "@ygoe/msgpack";
Expand All @@ -20,7 +20,7 @@ class DocSearch extends Component {
inner!: IComponent;
pager!: Pagination<string[][]>;
query!: Stream<string>;
queryResults!: Subscription<string, string[][]>;
queryResults!: ISubscription<string, string[][]>;

updateQuery(e: InputEvent) {
const term = (<HTMLInputElement>e.target).value;
Expand Down
8 changes: 4 additions & 4 deletions examples/rdom-search-docs/src/pagination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import type { IRelease } from "@thi.ng/api";
import { equiv } from "@thi.ng/equiv";
import { button, div } from "@thi.ng/hiccup-html";
import { clamp } from "@thi.ng/math";
import { reactive, Stream, Subscription, sync } from "@thi.ng/rstream";
import { ISubscription, reactive, Stream, sync } from "@thi.ng/rstream";
import { comp, dedupe, map, page } from "@thi.ng/transducers";

export class Pagination<T extends any[]> implements IRelease {
page: Stream<number>;
maxPage: Subscription<T, number>;
resultPage: Subscription<any, T>;
maxPage: ISubscription<T, number>;
resultPage: ISubscription<any, T>;

constructor(src: Subscription<any, T>, pageSize: number) {
constructor(src: ISubscription<any, T>, pageSize: number) {
this.page = reactive(0);
this.maxPage = src.transform(map((res) => ~~(res.length / pageSize)));
// produce search result page using `page()` transducer
Expand Down
4 changes: 1 addition & 3 deletions examples/rstream-event-loop/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ export const defHandler = <E extends EventType>(
next: <Fn<Event, void>>handler,
error: console.warn,
};
return xform
? eventProc.subscribeTopic(id, {}, {}).subscribe(sub, xform)
: eventProc.subscribeTopic(id, sub);
return eventProc.subscribeTopic(id, sub, { xform });
};

/**
Expand Down
43 changes: 34 additions & 9 deletions examples/rstream-event-loop/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { peek } from "@thi.ng/arrays";
import { fromRAF, sidechainPartition } from "@thi.ng/rstream";
import { sidechainPartitionRAF } from "@thi.ng/rstream";
import { map } from "@thi.ng/transducers";
import { updateDOM } from "@thi.ng/transducers-hdom";
import { AppState, NEXT, PREV } from "./api";
Expand All @@ -24,10 +23,38 @@ const app = ({ pageID, isLoading }: AppState) =>
// navigation buttons w/ event dispatch
[
"div",
["button", { onclick: () => dispatch([PREV, 5]) }, "<<"],
["button", { onclick: () => dispatch([PREV, 1]) }, "<"],
["button", { onclick: () => dispatch([NEXT, 1]) }, ">"],
["button", { onclick: () => dispatch([NEXT, 5]) }, ">>"],
[
"button",
{
disabled: pageID < 5,
onclick: () => dispatch([PREV, 5]),
},
"<<",
],
[
"button",
{
disabled: pageID === 0,
onclick: () => dispatch([PREV, 1]),
},
"<",
],
[
"button",
{
disabled: pageID === 19,
onclick: () => dispatch([NEXT, 1]),
},
">",
],
[
"button",
{
disabled: pageID >= 15,
onclick: () => dispatch([NEXT, 5]),
},
">>",
],
],
// only here to show timestamp of last DOM update
["div.mt3", new Date().toString()],
Expand All @@ -45,6 +72,4 @@ const page = (_: any, pageID: number) => ["h1", `Page: ${pageID}`];
// sidechain to buffer intra-frame state updates. then only passes the
// most recent one to `app()` and its resulting UI tree to the
// `updateDOM()` transducer
state
.subscribe(sidechainPartition<AppState, number>(fromRAF()))
.transform(map(peek), map(app), updateDOM());
sidechainPartitionRAF(state).transform(map(app), updateDOM());
14 changes: 5 additions & 9 deletions examples/rstream-hdom/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { peek } from "@thi.ng/arrays";
import {
fromRAF,
ISubscribable,
sidechainPartition,
Subscription,
ISubscriber,
sidechainPartitionRAF,
subscription,
sync,
} from "@thi.ng/rstream";
Expand Down Expand Up @@ -47,9 +45,7 @@ const ctx = {
* @param ctx user context object
*/
const domUpdate = (root: HTMLElement, tree: ISubscribable<any>, ctx?: any) =>
tree
.subscribe(sidechainPartition<any, number>(fromRAF()))
.transform(map(peek), updateDOM({ root, ctx }));
sidechainPartitionRAF(tree).transform(updateDOM({ root, ctx }));

/**
* Generic button component.
Expand All @@ -70,7 +66,7 @@ const button = (ctx: any, onclick: EventListener, body: any) => [
* @param _ hdom user context (unused)
* @param stream counter stream
*/
const clickButton = (_: any, stream: Subscription<boolean, number>) => [
const clickButton = (_: any, stream: ISubscriber<boolean>) => [
button,
() => stream.next(true),
stream.deref(),
Expand All @@ -82,7 +78,7 @@ const clickButton = (_: any, stream: Subscription<boolean, number>) => [
* @param _ hdom user context (unused)
* @param counters streams to reset
*/
const resetButton = (_: any, counters: Subscription<boolean, number>[]) => [
const resetButton = (_: any, counters: ISubscriber<boolean>[]) => [
button,
() => counters.forEach((c) => c.next(false)),
"reset",
Expand Down
4 changes: 2 additions & 2 deletions examples/webgl-msdf/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { canvasWebGL } from "@thi.ng/hdom-components";
import { fitClamped } from "@thi.ng/math";
import { concat, lookAt, perspective, transform44 } from "@thi.ng/matrices";
import { SYSTEM } from "@thi.ng/random";
import { fromDOMEvent, Subscription } from "@thi.ng/rstream";
import { fromDOMEvent, ISubscription } from "@thi.ng/rstream";
import {
$w,
add,
Expand Down Expand Up @@ -219,7 +219,7 @@ const app = () => {
const glyphs = convertGlyphs(GLYPHS);
let stars: ModelSpec;
let body: ModelSpec;
let mouse: Subscription<any, ReadonlyVec>;
let mouse: ISubscription<any, ReadonlyVec>;
let bg = 0;
const canvas = canvasWebGL({
init: async (el, gl) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/xml-converter/src/ui.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Subscription } from "@thi.ng/rstream";
import type { ISubscriber } from "@thi.ng/rstream";
import { mapIndexed } from "@thi.ng/transducers";
import { handleTab } from "./utils";

Expand Down Expand Up @@ -143,7 +143,7 @@ const iconButton = (
const copyButton = (
{ copyButton }: any,
attribs: any,
stream: Subscription<boolean, boolean>,
stream: ISubscriber<boolean>,
text: string,
delay = 500
) => [
Expand Down
Loading