diff --git a/packages/rstream/src/stream-merge.ts b/packages/rstream/src/stream-merge.ts index a72cd3903b..d4ae06bc49 100644 --- a/packages/rstream/src/stream-merge.ts +++ b/packages/rstream/src/stream-merge.ts @@ -37,7 +37,13 @@ export class StreamMerge extends Subscription { this.sources.set( src, src.subscribe({ - next: (x) => this.next(x), + next: (x) => { + if (x instanceof Subscription) { + this.add(x); + } else { + this.next(x); + } + }, done: () => this.markDone(src) })); } diff --git a/packages/rstream/test/stream-merge.ts b/packages/rstream/test/stream-merge.ts index f6d35dddd8..cf8be68424 100644 --- a/packages/rstream/test/stream-merge.ts +++ b/packages/rstream/test/stream-merge.ts @@ -72,4 +72,22 @@ describe("StreamMerge", () => { src.subscribe(check([1, 2, 2, 3, 10, 11, 20, 21], done)); }); + it("transducer streams", (done) => { + const sources = [ + rs.fromIterable([1, 2, 3]), + rs.fromIterable([4, 5, 6]), + ].map( + (s) => s.subscribe(tx.map(x => rs.fromIterable([x, x, x]))) + ); + const merge = new rs.StreamMerge({ src: sources }); + const histogram = tx.frequencies(); + let acc: any = histogram[0](); + merge.subscribe({ + next(x) { acc = histogram[2](acc, x) }, + done() { + assert.deepEqual(acc, new Map([[1, 3], [2, 3], [3, 3], [4, 3], [5, 3], [6, 3]])); + done(); + } + }) + }); });