Skip to content

Commit

Permalink
feat(rstream): add Subscription.transform()
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Apr 15, 2018
1 parent 7cf93fc commit 2164ddf
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { illegalArity, illegalState } from "@thi.ng/api/error";
import { isFunction } from "@thi.ng/checks/is-function";
import { implementsFunction } from "@thi.ng/checks/implements-function";
import { isString } from "@thi.ng/checks/is-string";
import { Reducer, Transducer, SEMAPHORE } from "@thi.ng/transducers/api";
import { comp } from "@thi.ng/transducers/func/comp";
import { push } from "@thi.ng/transducers/rfn/push";
import { isReduced, unreduced } from "@thi.ng/transducers/reduced";

Expand Down Expand Up @@ -45,6 +47,10 @@ export class Subscription<A, B> implements
return this.state;
}

/**
* Creates new child subscription with given subscriber and/or
* transducer and optional subscription ID.
*/
subscribe(sub: Partial<ISubscriber<B>>, id?: string): Subscription<B, B>
subscribe<C>(xform: Transducer<B, C>, id?: string): Subscription<B, C>;
subscribe<C>(sub: Partial<ISubscriber<C>>, xform: Transducer<B, C>, id?: string): Subscription<B, C>
Expand Down Expand Up @@ -83,6 +89,12 @@ export class Subscription<A, B> implements
return <Subscription<B, B>>this.addWrapped(sub);
}

/**
* Returns array of new child subscriptions for all given
* subscribers.
*
* @param subs
*/
subscribeAll(...subs: ISubscriber<B>[]) {
const wrapped: Subscription<B, B>[] = [];
for (let s of subs) {
Expand All @@ -91,6 +103,27 @@ export class Subscription<A, B> implements
return wrapped;
}

/**
* Creates a new child subscription using given transducers and
* optional subscription ID. Supports up to 4 transducers and if
* more than one transducer is given, composes them in left-to-right
* order using @thi.ng/transducers `comp()`.
*
* Shorthand for `subscribe(comp(xf1, xf2,...), id)`
*/
transform<C>(a: Transducer<B, C>, id?: string): Subscription<B, C>;
transform<C, D>(a: Transducer<B, C>, b: Transducer<C, D>, id?: string): Subscription<B, D>;
transform<C, D, E>(a: Transducer<B, C>, b: Transducer<C, D>, c: Transducer<D, E>, id?: string): Subscription<B, E>;
transform<C, D, E, F>(a: Transducer<B, C>, b: Transducer<C, D>, c: Transducer<D, E>, d: Transducer<E, F>, id?: string): Subscription<B, F>;
transform(...xf: any[]) {
const n = xf.length - 1;
if (isString(xf[n])) {
return this.subscribe((<any>comp)(...xf.slice(0, n)), xf[n]);
} else {
return this.subscribe((<any>comp)(...xf));
}
}

/**
* If called without arg, removes this subscription from parent (if
* any), cleans up internal state and goes into DONE state. If
Expand Down

0 comments on commit 2164ddf

Please sign in to comment.