Skip to content

Commit

Permalink
feat(rstream): fix #6 update StreamMerge to support transduced input …
Browse files Browse the repository at this point in the history
…streams

- any Subscription values (incl. Streams) sent by inputs are added
   to the set of inputs themselves and not passed downstream
- add test case
  • Loading branch information
postspectacular committed Mar 20, 2018
1 parent db61b0b commit 8026409
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
8 changes: 7 additions & 1 deletion packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ export class StreamMerge<A, B> extends Subscription<A, B> {
this.sources.set(
src,
src.subscribe({
next: (x) => this.next(x),
next: (x) => {
if (x instanceof Subscription) {
this.add(x);
} else {
this.next(x);
}
},
done: () => this.markDone(src)
}));
}
Expand Down
18 changes: 18 additions & 0 deletions packages/rstream/test/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,22 @@ describe("StreamMerge", () => {
src.subscribe(check([1, 2, 2, 3, 10, 11, 20, 21], done));
});

it("transducer streams", (done) => {
const sources = [
rs.fromIterable([1, 2, 3]),
rs.fromIterable([4, 5, 6]),
].map(
(s) => s.subscribe(tx.map(x => rs.fromIterable([x, x, x])))
);
const merge = new rs.StreamMerge({ src: sources });
const histogram = tx.frequencies();
let acc: any = histogram[0]();
merge.subscribe({
next(x) { acc = histogram[2](acc, x) },
done() {
assert.deepEqual(acc, new Map([[1, 3], [2, 3], [3, 3], [4, 3], [5, 3], [6, 3]]));
done();
}
})
});
});

0 comments on commit 8026409

Please sign in to comment.