Skip to content

Commit

Permalink
feat(rstream): Subscription stores last value and passes to new subs
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Mar 20, 2018
1 parent ebe222c commit 6b87bca
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
10 changes: 9 additions & 1 deletion packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { isFunction } from "@thi.ng/checks/is-function";
import { implementsFunction } from "@thi.ng/checks/implements-function";
import { Reducer, Transducer } from "@thi.ng/transducers/api";
import { Reducer, Transducer, SEMAPHORE } from "@thi.ng/transducers/api";
import { push } from "@thi.ng/transducers/rfn/push";
import { isReduced, unreduced } from "@thi.ng/transducers/reduced";

Expand All @@ -19,9 +19,12 @@ export class Subscription<A, B> implements
protected xform: Reducer<B[], A>;
protected state: State = State.IDLE;

protected last: any;

constructor(sub?: ISubscriber<B>, xform?: Transducer<A, B>, parent?: ISubscribable<A>, id?: string) {
this.parent = parent;
this.id = id || `sub-${Subscription.NEXT_ID++}`;
this.last = SEMAPHORE;
this.subs = new Set();
if (sub) {
this.subs.add(<ISubscriber<B>>sub);
Expand Down Expand Up @@ -67,6 +70,9 @@ export class Subscription<A, B> implements
} else {
sub = new Subscription(sub, xform, this, id);
}
if (this.last !== SEMAPHORE) {
sub.next(this.last);
}
return <Subscription<B, B>>this.addWrapped(sub);
}

Expand All @@ -83,6 +89,7 @@ export class Subscription<A, B> implements
if (this.parent) {
const res = this.parent.unsubscribe(this);
this.state = State.DONE;
delete this.last;
delete this.parent;
return res;
}
Expand Down Expand Up @@ -167,6 +174,7 @@ export class Subscription<A, B> implements
}

protected dispatch(x: B) {
this.last = x;
for (let s of this.subs) {
try {
s.next && s.next(x);
Expand Down
26 changes: 26 additions & 0 deletions packages/rstream/test/subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// import * as tx from "@thi.ng/transducers";
import * as assert from "assert";

import * as rs from "../src/index";

describe("Subscription", () => {
let src: rs.Stream<number>;

beforeEach(() => {
});

it("new sub receives last", (done) => {
let buf = [];
src = rs.fromIterable([1, 2, 3], 10);
src.subscribe({ next(x) { buf.push(x); } });
setTimeout(() =>
src.subscribe({
next(x) { buf.push(x); },
done() {
assert.deepEqual(buf, [1, 2, 2, 3, 3]);
done();
}
}),
25);
});
});

0 comments on commit 6b87bca

Please sign in to comment.