Skip to content

Commit

Permalink
feat(rstream): update Sub2, State enum
Browse files Browse the repository at this point in the history
- add State.UNSUBSCRIBED
- add missing Sub2.done() handling
- add Sub2.map()
- refactor Sub2 value/phase dispatch logic
- add logging
  • Loading branch information
postspectacular committed Mar 10, 2021
1 parent 1a7f5c3 commit db0ab34
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 52 deletions.
1 change: 1 addition & 0 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export enum State {
IDLE,
ACTIVE,
DONE,
UNSUBSCRIBED,
ERROR,
DISABLED, // TODO currently unused
}
Expand Down
171 changes: 119 additions & 52 deletions packages/rstream/src/sub2.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { assert, SEMAPHORE } from "@thi.ng/api";
import { assert, Fn, NULL_LOGGER, SEMAPHORE } from "@thi.ng/api";
import { isPlainObject } from "@thi.ng/checks";
import {
comp,
isReduced,
map,
peek,
push,
Reduced,
Expand All @@ -15,6 +16,7 @@ import {
CommonOpts,
ISubscriber,
ISubscription,
LOGGER,
State,
SubscriptionOpts,
TransformableOpts,
Expand Down Expand Up @@ -140,37 +142,119 @@ export class Sub2<A, B> implements ISubscription<A, B> {
);
}

/**
* Syntax sugar for {@link Subscription.transform} when using a
* single {@link @thi.ng/transducers#map} transducer only. The given
* function `fn` is used as `map`'s transformation fn.
*
* @param fn
* @param opts
*/
map<C>(
fn: Fn<B, C>,
opts?: Partial<WithErrorHandlerOpts>
): ISubscription<B, C> {
return this.transform(map(fn), opts || {});
}

unsubscribe(sub?: Partial<ISubscriber<B>>) {
LOGGER.debug(this.id, "unsub start", sub ? sub.id : "self");
if (!sub) {
this.parent && this.parent.unsubscribe(this);
this.state = State.UNSUBSCRIBED;
this.release();
return true;
}
LOGGER.debug(this.id, "unsub child", sub.id);
if (this.subs.delete(sub)) {
if (
this.closeOut === CloseMode.FIRST ||
(!this.subs.size && this.closeOut !== CloseMode.NEVER)
) {
this.unsubscribe();
}
return true;
}
return false;
}

next(x: A) {
if (this.state >= State.DONE) return;
this.xform ? this.dispatchXform(x) : this.dispatch(<any>x);
}

dispatch(x: B) {
this.cacheLast && (this.last = x);
done() {
LOGGER.debug(this.id, "entering done()");
if (this.state >= State.DONE) return;
if (this.xform) {
if (!this.dispatchXformDone()) return;
}
// attempt to call .done in wrapped sub
if (!this.dispatchTo("done")) return;
// disconnect from parent & internal cleanup
this.unsubscribe();
this.state = State.DONE;
LOGGER.debug(this.id, "exiting done()");
}

error(e: any) {
// only the wrapped sub's error handler gets a chance
// to deal with the error
const sub = this.wrapped;
const hasErrorHandler = sub && sub.error;
hasErrorHandler &&
LOGGER.debug(this.id, "attempting wrapped error handler");
// flag success if error handler returns true
// (i.e. it could handle/recover from the error)
// else detach this entire sub by going into error state...
return (hasErrorHandler && sub!.error!(e)) || this.unhandledError(e);
}

protected unhandledError(e: any) {
// ensure error is at least logged to console
// even if default NULL_LOGGER is used...
(LOGGER !== NULL_LOGGER ? LOGGER : console).warn(
this.id,
"unhandled error:",
e
);
this.unsubscribe();
this.state = State.ERROR;
return false;
}

protected dispatchTo(type: "next" | "done", x?: B) {
let s: Partial<ISubscriber<B>> | undefined = this.wrapped;
if (s) {
try {
s.next && s.next(x);
s[type] && s[type]!(x!);
} catch (e) {
// give wrapped sub a chance to handle error
if (!this.error(e)) return;
// (if that failed then we're already in error state now & terminate)
if (!this.error(e)) return false;
}
}
for (s of this.subs) {
// process other child subs
for (s of type === "next" ? this.subs : [...this.subs]) {
try {
s.next && s.next(x);
s[type] && s[type]!(x!);
} catch (e) {
// give sub a chance to handle error
// but terminate if handler missing or unsuccessful
if (!s.error || !s.error(e)) {
this.unhandledError(e);
return;
// if no or failed handler, go into error state
return this.unhandledError(e);
}
}
}
return true;
}

dispatchXform(x: A) {
protected dispatch(x: B) {
LOGGER.debug(this.id, "dispatch", x);
this.cacheLast && (this.last = x);
this.dispatchTo("next", x);
}

protected dispatchXform(x: A) {
let acc: B[] | Reduced<B[]>;
try {
acc = this.xform![2]([], x);
Expand All @@ -181,54 +265,37 @@ export class Sub2<A, B> implements ISubscription<A, B> {
// don't dispatch value(s)
return;
}
const uacc = unreduced(acc);
const n = uacc.length;
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
if (this.state === State.ERROR) return;
if (this.dispatchXformVals(acc)) {
isReduced(acc) && this.done();
}
isReduced(acc) && this.done();
}

done() {}

error(e: any) {
// only the wrapped sub's error handler gets a chance
// to deal with the error
const sub = this.wrapped;
// flag success if error handler returns true
// (i.e. could handle/recover from the error)
// else detach this entire sub...
return (sub && sub.error && sub.error(e)) || this.unhandledError(e);
}

unhandledError(e: any) {
console.warn("uncaught error", e);
this.unsubscribe();
this.state = State.ERROR;
return false;
protected dispatchXformDone() {
let acc: B[] | Reduced<B[]>;
try {
// collect remaining values from transducer
acc = this.xform![1]([]);
} catch (e) {
// error in transducer can only be handled by the wrapped
// subscriber's error handler (if avail)
return this.error(e);
}
return this.dispatchXformVals(acc);
}

unsubscribe(sub?: Partial<ISubscriber<B>>) {
if (!sub) {
this.parent && this.parent.unsubscribe(this);
this.state = State.DONE;
this.release();
return true;
}
if (this.subs.delete(sub)) {
if (
this.closeOut === CloseMode.FIRST ||
(!this.subs.size && this.closeOut !== CloseMode.NEVER)
) {
this.unsubscribe();
}
return true;
protected dispatchXformVals(acc: B[] | Reduced<B[]>) {
const uacc = unreduced(acc);
for (
let i = 0, n = uacc.length;
i < n && this.state < State.DONE;
i++
) {
this.dispatch(uacc[i]);
}
return false;
return this.state < State.ERROR;
}

release() {
protected release() {
this.subs.clear();
delete this.parent;
delete this.xform;
Expand Down

0 comments on commit db0ab34

Please sign in to comment.