Skip to content

Commit

Permalink
refactor(throttle): remove tryCatch/errorObject
Browse files Browse the repository at this point in the history
  • Loading branch information
tetsuharuohzeki committed Feb 10, 2016
1 parent 6060977 commit 7bae860
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions src/operator/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';
Expand Down Expand Up @@ -32,16 +30,26 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {

protected _next(value: T): void {
if (!this.throttled) {
const duration = tryCatch(this.durationSelector)(value);
if (duration === errorObject) {
this.destination.error(errorObject.e);
} else {
this.add(this.throttled = subscribeToResult(this, duration));
this.destination.next(value);
}
this.tryDurationSelector(value);
}
}

private tryDurationSelector(value: T): void {
let duration: Observable<number> | Promise<number> = null;
try {
duration = this.durationSelector(value);
} catch (err) {
this.destination.error(err);
return;
}
this.emitAndThrottle(value, duration);
}

private emitAndThrottle(value: T, duration: Observable<number> | Promise<number>) {
this.add(this.throttled = subscribeToResult(this, duration));
this.destination.next(value);
}

protected _unsubscribe() {
const throttled = this.throttled;
if (throttled) {
Expand Down

0 comments on commit 7bae860

Please sign in to comment.