Skip to content

Commit

Permalink
feat(rstream): update resolve(), update subscribe() overrides
Browse files Browse the repository at this point in the history
- replace resolve() opt `id` arg w/ `ResolveOpts` object
- if `fail` option is given use as Promise failure handler instead of
  calling `this.error()` and thereby stopping stream
- add new override for actual child `Subscription`s, fixes generics
- update `subscribe()` for Subscription, Stream, PubSub
  • Loading branch information
postspectacular committed May 20, 2018
1 parent 0e103c5 commit 23fdd39
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
3 changes: 2 additions & 1 deletion packages/rstream/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ export class PubSub<A, B> extends Subscription<A, B> {
return null;
}

subscribeTopic(topicID: any, sub: Partial<ISubscriber<B>>, id?: string): Subscription<B, B>;
subscribeTopic<C>(topicID: any, tx: Transducer<B, C>, id?: string): Subscription<B, C>;
subscribeTopic<C>(topicID: any, sub: Subscription<B, C>): Subscription<B, C>;
subscribeTopic(topicID: any, sub: Partial<ISubscriber<B>>, id?: string): Subscription<B, B>;
subscribeTopic(topicID: any, sub: any, id?: string): Subscription<any, any> {
let t = this.topics.get(topicID);
if (!t) {
Expand Down
5 changes: 3 additions & 2 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ export class Stream<T> extends Subscription<T, T>
this.src = src;
}

subscribe(sub: Partial<ISubscriber<T>>, id?: string): Subscription<T, T>
subscribe<C>(sub: Partial<ISubscriber<C>>, xform: Transducer<T, C>, id?: string): Subscription<T, C>;
subscribe<C>(sub: Subscription<T, C>): Subscription<T, C>;
subscribe<C>(xform: Transducer<T, C>, id?: string): Subscription<T, C>;
subscribe<C>(sub: Partial<ISubscriber<C>>, xform: Transducer<T, C>, id?: string): Subscription<T, C>
subscribe(sub: Partial<ISubscriber<T>>, id?: string): Subscription<T, T>;
subscribe(...args: any[]) {
const wrapped = super.subscribe.apply(this, args);
if (this.subs.size === 1) {
Expand Down
19 changes: 13 additions & 6 deletions packages/rstream/src/subs/resolve.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { IID } from "@thi.ng/api/api";
import { DEBUG, State } from "../api";
import { Subscription } from "../subscription";

export interface ResolverOpts extends IID<string> {
fail: (e: any) => void;
}

export class Resolver<T> extends Subscription<Promise<T>, T> {

protected outstanding = 0;
protected fail: (e: any) => void;

constructor(id?: string) {
super(null, null, null, id || `resolve-${Subscription.NEXT_ID++}`);
constructor(opts: Partial<ResolverOpts> = {}) {
super(null, null, null, opts.id || `resolve-${Subscription.NEXT_ID++}`);
this.fail = opts.fail;
}

next(x: Promise<T>) {
Expand All @@ -22,7 +29,7 @@ export class Resolver<T> extends Subscription<Promise<T>, T> {
DEBUG && console.log(`resolved value in ${State[this.state]} state (${x})`);
}
},
(e) => this.error(e)
(e) => (this.fail || this.error)(e)
);
}

Expand All @@ -33,6 +40,6 @@ export class Resolver<T> extends Subscription<Promise<T>, T> {
}
}

export function resolve<T>(id?: string) {
return new Resolver<T>(id);
}
export function resolve<T>(opts?: Partial<ResolverOpts>) {
return new Resolver<T>(opts);
}
5 changes: 3 additions & 2 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ export class Subscription<A, B> implements
* Creates new child subscription with given subscriber and/or
* transducer and optional subscription ID.
*/
subscribe(sub: Partial<ISubscriber<B>>, id?: string): Subscription<B, B>
subscribe<C>(sub: Partial<ISubscriber<C>>, xform: Transducer<B, C>, id?: string): Subscription<B, C>;
subscribe<C>(sub: Subscription<B, C>): Subscription<B, C>;
subscribe<C>(xform: Transducer<B, C>, id?: string): Subscription<B, C>;
subscribe<C>(sub: Partial<ISubscriber<C>>, xform: Transducer<B, C>, id?: string): Subscription<B, C>
subscribe(sub: Partial<ISubscriber<B>>, id?: string): Subscription<B, B>;
subscribe(...args: any[]) {
this.ensureState();
let sub, xform, id;
Expand Down

0 comments on commit 23fdd39

Please sign in to comment.