Skip to content

Commit

Permalink
feat(rstream): add stream() & subscription() factories, add/update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Aug 3, 2018
1 parent 55499cc commit e97aac0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
43 changes: 43 additions & 0 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,49 @@ import {
} from "./api";
import { Subscription } from "./subscription";

/**
* Creates a new `Stream` instance, optionally with given `StreamSource`
* function and / or ID. If a `src` function is provided, the function
* will be only called (with the `Stream` instance as single argument)
* once the first subscriber has attached to the stream. If the function
* returns another function, it will be used for cleanup purposes if the
* stream is cancelled, e.g. if the last subscriber has unsubscribed.
*
* Streams are intended as (primarily async) data sources in a dataflow
* graph and are the primary construct for the various `from*()`
* functions provided by the package. However, streams can also be
* triggered manually (from outside the stream), in which case the user
* should call `stream.next()` to cause value propagation.
*
* ```
* a = stream((s) => {
* s.next(1);
* s.next(2);
* s.done()
* });
* a.subscribe(trace("a:"))
* // a: 1
* // a: 2
* // a: done
* ```
*
* `Stream` (like `Subscription`) implements the @thi.ng/api `IDeref`
* interface which provides read access to the stream's last received
* value. This is useful for UI purposes, e.g. in combination with
* @thi.ng/hdom, which supports direct embedding of streams into UI
* components (will be deref'd automatically).
*
* @param id
* @param src
*/
export function stream(): Stream<any>;
export function stream(id: string): Stream<any>;
export function stream<T>(src: StreamSource<T>);
export function stream<T>(src: StreamSource<T>, id: string);
export function stream(src?, id?) {
return new Stream(src, id);
}

export class Stream<T> extends Subscription<T, T>
implements IStream<T> {

Expand Down
22 changes: 22 additions & 0 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,28 @@ import {
State
} from "./api";

/**
* Creates a new `Subscription` instance, the fundamental datatype &
* building block provided by this package. Subscriptions can be:
*
* - linked into directed graphs (not necessarily DAGs),
* - transformed using transducers (incl. early termination)
* - have any number of subscribers
* - recursively unsubscribe themselves from parent if no subscribers
* remain
* - implement @thi.ng/api `IDeref` interface
* - will go into a non-recoverable error state if NONE of the
* subscribers has an error handler itself
*
* @param sub
* @param xform
* @param parent
* @param id
*/
export function subscription<A, B>(sub?: ISubscriber<B>, xform?: Transducer<A, B>, parent?: ISubscribable<A>, id?: string) {
return new Subscription(sub, xform, parent, id);
}

export class Subscription<A, B> implements
IDeref<B>,
ISubscriber<A>,
Expand Down

0 comments on commit e97aac0

Please sign in to comment.