Skip to content

Commit

Permalink
fix(mergeAll): use higher-order lettable version of mergeAll
Browse files Browse the repository at this point in the history
jasonaden committed Sep 28, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 60c96ab commit f0b703b
Showing 1 changed file with 2 additions and 57 deletions.
59 changes: 2 additions & 57 deletions src/operator/mergeAll.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Observer } from '../Observer';
import { Subscription } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { Subscribable } from '../Observable';
import { subscribeToResult } from '../util/subscribeToResult';
import { mergeAll as higherOrder } from '../operators/mergeAll';

export function mergeAll<T>(this: Observable<T>, concurrent?: number): T;
export function mergeAll<T, R>(this: Observable<T>, concurrent?: number): Subscribable<R>;
@@ -54,56 +50,5 @@ export function mergeAll<T, R>(this: Observable<T>, concurrent?: number): Subscr
* @owner Observable
*/
export function mergeAll<T>(this: Observable<T>, concurrent: number = Number.POSITIVE_INFINITY): T {
return <any>this.lift<any>(new MergeAllOperator<T>(concurrent));
}

export class MergeAllOperator<T> implements Operator<Observable<T>, T> {
constructor(private concurrent: number) {
}

call(observer: Observer<T>, source: any): any {
return source.subscribe(new MergeAllSubscriber(observer, this.concurrent));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class MergeAllSubscriber<T> extends OuterSubscriber<Observable<T>, T> {
private hasCompleted: boolean = false;
private buffer: Observable<T>[] = [];
private active: number = 0;

constructor(destination: Observer<T>, private concurrent: number) {
super(destination);
}

protected _next(observable: Observable<T>) {
if (this.active < this.concurrent) {
this.active++;
this.add(subscribeToResult<Observable<T>, T>(this, observable));
} else {
this.buffer.push(observable);
}
}

protected _complete() {
this.hasCompleted = true;
if (this.active === 0 && this.buffer.length === 0) {
this.destination.complete();
}
}

notifyComplete(innerSub: Subscription) {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift());
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
return <any>higherOrder(concurrent)(this);
}

0 comments on commit f0b703b

Please sign in to comment.