From db0ab34fcea8869d9c85c51f5faacf1e1f6bb0ec Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 10 Mar 2021 11:23:41 +0000 Subject: [PATCH] feat(rstream): update Sub2, State enum - add State.UNSUBSCRIBED - add missing Sub2.done() handling - add Sub2.map() - refactor Sub2 value/phase dispatch logic - add logging --- packages/rstream/src/api.ts | 1 + packages/rstream/src/sub2.ts | 171 ++++++++++++++++++++++++----------- 2 files changed, 120 insertions(+), 52 deletions(-) diff --git a/packages/rstream/src/api.ts b/packages/rstream/src/api.ts index 1faad88cfe..2708a4c283 100644 --- a/packages/rstream/src/api.ts +++ b/packages/rstream/src/api.ts @@ -6,6 +6,7 @@ export enum State { IDLE, ACTIVE, DONE, + UNSUBSCRIBED, ERROR, DISABLED, // TODO currently unused } diff --git a/packages/rstream/src/sub2.ts b/packages/rstream/src/sub2.ts index aa648c68a9..b85a83d13b 100644 --- a/packages/rstream/src/sub2.ts +++ b/packages/rstream/src/sub2.ts @@ -1,8 +1,9 @@ -import { assert, SEMAPHORE } from "@thi.ng/api"; +import { assert, Fn, NULL_LOGGER, SEMAPHORE } from "@thi.ng/api"; import { isPlainObject } from "@thi.ng/checks"; import { comp, isReduced, + map, peek, push, Reduced, @@ -15,6 +16,7 @@ import { CommonOpts, ISubscriber, ISubscription, + LOGGER, State, SubscriptionOpts, TransformableOpts, @@ -140,37 +142,119 @@ export class Sub2 implements ISubscription { ); } + /** + * Syntax sugar for {@link Subscription.transform} when using a + * single {@link @thi.ng/transducers#map} transducer only. The given + * function `fn` is used as `map`'s transformation fn. + * + * @param fn + * @param opts + */ + map( + fn: Fn, + opts?: Partial + ): ISubscription { + return this.transform(map(fn), opts || {}); + } + + unsubscribe(sub?: Partial>) { + LOGGER.debug(this.id, "unsub start", sub ? sub.id : "self"); + if (!sub) { + this.parent && this.parent.unsubscribe(this); + this.state = State.UNSUBSCRIBED; + this.release(); + return true; + } + LOGGER.debug(this.id, "unsub child", sub.id); + if (this.subs.delete(sub)) { + if ( + this.closeOut === CloseMode.FIRST || + (!this.subs.size && this.closeOut !== CloseMode.NEVER) + ) { + this.unsubscribe(); + } + return true; + } + return false; + } + next(x: A) { if (this.state >= State.DONE) return; this.xform ? this.dispatchXform(x) : this.dispatch(x); } - dispatch(x: B) { - this.cacheLast && (this.last = x); + done() { + LOGGER.debug(this.id, "entering done()"); + if (this.state >= State.DONE) return; + if (this.xform) { + if (!this.dispatchXformDone()) return; + } + // attempt to call .done in wrapped sub + if (!this.dispatchTo("done")) return; + // disconnect from parent & internal cleanup + this.unsubscribe(); + this.state = State.DONE; + LOGGER.debug(this.id, "exiting done()"); + } + + error(e: any) { + // only the wrapped sub's error handler gets a chance + // to deal with the error + const sub = this.wrapped; + const hasErrorHandler = sub && sub.error; + hasErrorHandler && + LOGGER.debug(this.id, "attempting wrapped error handler"); + // flag success if error handler returns true + // (i.e. it could handle/recover from the error) + // else detach this entire sub by going into error state... + return (hasErrorHandler && sub!.error!(e)) || this.unhandledError(e); + } + + protected unhandledError(e: any) { + // ensure error is at least logged to console + // even if default NULL_LOGGER is used... + (LOGGER !== NULL_LOGGER ? LOGGER : console).warn( + this.id, + "unhandled error:", + e + ); + this.unsubscribe(); + this.state = State.ERROR; + return false; + } + + protected dispatchTo(type: "next" | "done", x?: B) { let s: Partial> | undefined = this.wrapped; if (s) { try { - s.next && s.next(x); + s[type] && s[type]!(x!); } catch (e) { // give wrapped sub a chance to handle error - if (!this.error(e)) return; + // (if that failed then we're already in error state now & terminate) + if (!this.error(e)) return false; } } - for (s of this.subs) { + // process other child subs + for (s of type === "next" ? this.subs : [...this.subs]) { try { - s.next && s.next(x); + s[type] && s[type]!(x!); } catch (e) { - // give sub a chance to handle error - // but terminate if handler missing or unsuccessful if (!s.error || !s.error(e)) { - this.unhandledError(e); - return; + // if no or failed handler, go into error state + return this.unhandledError(e); } } } + return true; } - dispatchXform(x: A) { + protected dispatch(x: B) { + LOGGER.debug(this.id, "dispatch", x); + this.cacheLast && (this.last = x); + this.dispatchTo("next", x); + } + + protected dispatchXform(x: A) { let acc: B[] | Reduced; try { acc = this.xform![2]([], x); @@ -181,54 +265,37 @@ export class Sub2 implements ISubscription { // don't dispatch value(s) return; } - const uacc = unreduced(acc); - const n = uacc.length; - for (let i = 0; i < n; i++) { - this.dispatch(uacc[i]); - if (this.state === State.ERROR) return; + if (this.dispatchXformVals(acc)) { + isReduced(acc) && this.done(); } - isReduced(acc) && this.done(); - } - - done() {} - - error(e: any) { - // only the wrapped sub's error handler gets a chance - // to deal with the error - const sub = this.wrapped; - // flag success if error handler returns true - // (i.e. could handle/recover from the error) - // else detach this entire sub... - return (sub && sub.error && sub.error(e)) || this.unhandledError(e); } - unhandledError(e: any) { - console.warn("uncaught error", e); - this.unsubscribe(); - this.state = State.ERROR; - return false; + protected dispatchXformDone() { + let acc: B[] | Reduced; + try { + // collect remaining values from transducer + acc = this.xform![1]([]); + } catch (e) { + // error in transducer can only be handled by the wrapped + // subscriber's error handler (if avail) + return this.error(e); + } + return this.dispatchXformVals(acc); } - unsubscribe(sub?: Partial>) { - if (!sub) { - this.parent && this.parent.unsubscribe(this); - this.state = State.DONE; - this.release(); - return true; - } - if (this.subs.delete(sub)) { - if ( - this.closeOut === CloseMode.FIRST || - (!this.subs.size && this.closeOut !== CloseMode.NEVER) - ) { - this.unsubscribe(); - } - return true; + protected dispatchXformVals(acc: B[] | Reduced) { + const uacc = unreduced(acc); + for ( + let i = 0, n = uacc.length; + i < n && this.state < State.DONE; + i++ + ) { + this.dispatch(uacc[i]); } - return false; + return this.state < State.ERROR; } - release() { + protected release() { this.subs.clear(); delete this.parent; delete this.xform;