diff --git a/packages/rstream/src/index.ts b/packages/rstream/src/index.ts index 4f10c477ad..55b1d5e66e 100644 --- a/packages/rstream/src/index.ts +++ b/packages/rstream/src/index.ts @@ -19,6 +19,7 @@ export * from "./from/raf"; export * from "./from/view"; export * from "./from/worker"; +export * from "./subs/asidechain"; export * from "./subs/bisect"; export * from "./subs/post-worker"; export * from "./subs/resolve"; diff --git a/packages/rstream/src/subs/asidechain.ts b/packages/rstream/src/subs/asidechain.ts new file mode 100644 index 0000000000..4a819f5bd0 --- /dev/null +++ b/packages/rstream/src/subs/asidechain.ts @@ -0,0 +1,27 @@ +import { CommonOpts } from "../api"; +import { Subscription } from "../subscription"; + +/** + * Abstract base class for sidechained subscription types (e.g. + * {@link sidechainPartition}, {@link sidechainToggle}). + */ +export abstract class ASidechain extends Subscription { + sideSub!: Subscription; + + constructor(opts?: Partial) { + super(undefined, opts); + } + + unsubscribe(sub?: Subscription) { + const res = super.unsubscribe(sub); + if (!sub || !this.subs.length) { + this.sideSub.unsubscribe(); + } + return res; + } + + done() { + this.sideSub.unsubscribe(); + super.done(); + } +} diff --git a/packages/rstream/src/subs/sidechain-partition.ts b/packages/rstream/src/subs/sidechain-partition.ts index ca57758879..a33c992a34 100644 --- a/packages/rstream/src/subs/sidechain-partition.ts +++ b/packages/rstream/src/subs/sidechain-partition.ts @@ -2,6 +2,7 @@ import { Predicate } from "@thi.ng/api"; import { CommonOpts, ISubscribable, State } from "../api"; import { Subscription } from "../subscription"; import { optsWithID } from "../utils/idgen"; +import { ASidechain } from "./asidechain"; export interface SidechainPartitionOpts extends CommonOpts { pred: Predicate; @@ -39,16 +40,15 @@ export const sidechainPartition = ( opts?: Partial> ): Subscription => new SidechainPartition(side, opts); -export class SidechainPartition extends Subscription { - sideSub: Subscription; - buf: A[]; +export class SidechainPartition extends ASidechain { + buf: T[]; constructor( - side: ISubscribable, - opts?: Partial> + side: ISubscribable, + opts?: Partial> ) { opts = optsWithID("sidepart", opts); - super(undefined, opts); + super(opts); this.buf = []; const pred = opts.pred || (() => true); const $this = this; @@ -69,22 +69,9 @@ export class SidechainPartition extends Subscription { }); } - unsubscribe(sub?: Subscription) { - const res = super.unsubscribe(sub); - if (!sub || !this.subs.length) { - this.sideSub.unsubscribe(); - } - return res; - } - - next(x: A) { + next(x: T) { if (this.state < State.DONE) { this.buf.push(x); } } - - done() { - this.sideSub.unsubscribe(); - super.done(); - } } diff --git a/packages/rstream/src/subs/sidechain-toggle.ts b/packages/rstream/src/subs/sidechain-toggle.ts index d68d1a0c26..fd79879571 100644 --- a/packages/rstream/src/subs/sidechain-toggle.ts +++ b/packages/rstream/src/subs/sidechain-toggle.ts @@ -1,7 +1,8 @@ import { Predicate } from "@thi.ng/api"; -import { CommonOpts, ISubscribable } from "../api"; +import { CommonOpts, ISubscribable, State } from "../api"; import { Subscription } from "../subscription"; import { optsWithID } from "../utils/idgen"; +import { ASidechain } from "./asidechain"; export interface SidechainToggleOpts extends CommonOpts { pred: Predicate; @@ -41,16 +42,15 @@ export const sidechainToggle = ( opts?: Partial> ): Subscription => new SidechainToggle(side, opts); -export class SidechainToggle extends Subscription { - sideSub: Subscription; +export class SidechainToggle extends ASidechain { isActive: boolean; constructor( - side: ISubscribable, - opts?: Partial> + side: ISubscribable, + opts?: Partial> ) { opts = optsWithID("sidetoggle", opts); - super(undefined, opts); + super(opts); this.isActive = !!opts.initial; const pred = opts.pred || (() => true); const $this = this; @@ -66,22 +66,9 @@ export class SidechainToggle extends Subscription { }); } - unsubscribe(sub?: Subscription) { - const res = super.unsubscribe(sub); - if (!sub || !this.subs.length) { - this.sideSub.unsubscribe(); - } - return res; - } - - next(x: A) { - if (this.isActive) { + next(x: T) { + if (this.isActive && this.state < State.DONE) { super.next(x); } } - - done() { - super.done(); - this.sideSub.unsubscribe(); - } }