diff --git a/spec/operators/retry-spec.js b/spec/operators/retry-spec.js new file mode 100644 index 0000000000..8c23d6a2da --- /dev/null +++ b/spec/operators/retry-spec.js @@ -0,0 +1,69 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.retry()', function () { + it('should retry a number of times, without error, then complete', function (done) { + var errors = 0; + var retries = 2; + Observable.value(42) + .map(function(x){ + if ((errors+=1) < retries){ + throw 'bad'; + } + errors = 0; + return x; + }) + .retry(retries) + .subscribe( + function(x){ + expect(x).toBe(42); + }, + function(err){ + expect('this was called').toBe(false); + }, done); + }); + it('should retry a number of times, then call error handler', function (done) { + var errors = 0; + var retries = 2; + Observable.value(42) + .map(function(x){ + if ((errors+=1) < retries){ + throw 'bad'; + } + return x; + }) + .retry(retries-1) + .subscribe( + function(x){ + expect(x).toBe(42); + }, + function(err){ + expect(errors).toBe(1); + done(); + }, function(){ + expect('this was called').toBe(false); + }); + }); + it('should retry until successful completion', function (done) { + var errors = 0; + var retries = 10; + Observable.value(42) + .map(function(x){ + if ((errors+=1) < retries){ + throw 'bad'; + } + errors = 0; + return x; + }) + .retry() + .take(retries) + .subscribe( + function(x){ + expect(x).toBe(42); + }, + function(err){ + expect('this was called').toBe(false); + }, done); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 8362b5f901..c00a0e3e18 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -157,6 +157,7 @@ export default class Observable { multicast: (subjectFactory: () => Subject) => ConnectableObservable; catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; + retry: (count: number) => Observable; retryWhen: (notifier: (errors: Observable) => Observable) => Observable; repeat: (count: number) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index c8284b9f2b..19b95d44b4 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -146,10 +146,12 @@ observableProto.defaultIfEmpty = defaultIfEmpty; observableProto.materialize = materialize; import _catch from './operators/catch'; +import retry from './operators/retry'; import retryWhen from './operators/retryWhen'; import repeat from './operators/repeat'; observableProto.catch = _catch; +observableProto.retry = retry; observableProto.retryWhen = retryWhen; observableProto.repeat = repeat; diff --git a/src/operators/retry.ts b/src/operators/retry.ts new file mode 100644 index 0000000000..ff6b460ab7 --- /dev/null +++ b/src/operators/retry.ts @@ -0,0 +1,38 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; + +export default function retry(count: number = 0): Observable { + return this.lift(new RetryOperator(count, this)); +} + +export class RetryOperator extends Operator { + constructor(private count: number, protected original:Observable) { + super(); + } + + call(observer: Observer): Observer { + return new RetrySubscriber(observer, this.count, this.original); + } +} + +export class RetrySubscriber extends Subscriber { + private retries: number = 0; + constructor(destination: Observer, private count: number, private original: Observable) { + super(destination); + } + + _error(err: any) { + const count = this.count; + if (count && count === (this.retries+=1)){ + this.destination.error(err); + } else { + this.resubscribe(); + } + } + + resubscribe() { + this.original.subscribe(this); + } +} \ No newline at end of file