diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 552ad56451..461a4e09ea 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { Observable } from '../Observable'; import { TeardownLogic } from '../Subscription'; +import { Subscription } from '../Subscription'; import { TimeoutError } from '../util/TimeoutError'; /** @@ -45,6 +46,8 @@ class TimeoutOperator implements Operator { class TimeoutSubscriber extends Subscriber { private index: number = 0; private _previousIndex: number = 0; + private action: Subscription = null; + get previousIndex(): number { return this._previousIndex; } @@ -65,18 +68,34 @@ class TimeoutSubscriber extends Subscriber { private static dispatchTimeout(state: any): void { const source = state.subscriber; const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { + if (source.previousIndex === currentIndex) { source.notifyTimeout(); } } private scheduleTimeout(): void { - let currentIndex = this.index; - this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex }); + const currentIndex = this.index; + const timeoutState = { subscriber: this, index: currentIndex }; + + this.cancelTimeout(); + this.action = this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState + ); + this.add(this.action); + this.index++; this._previousIndex = currentIndex; } + private cancelTimeout(): void { + const { action } = this; + if (action !== null) { + this.remove(action); + action.unsubscribe(); + this.action = null; + } + } + protected _next(value: T): void { this.destination.next(value);