diff --git a/spec/operators/retry-spec.ts b/spec/operators/retry-spec.ts index 87c640c9e68..353d6f3f7c0 100644 --- a/spec/operators/retry-spec.ts +++ b/spec/operators/retry-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { retry, map, take, mergeMap, concat, multicast, refCount } from 'rxjs/operators'; -import { Observable, Observer, of, throwError, Subject } from 'rxjs'; +import { Observable, Observer, defer, range, of, throwError, Subject } from 'rxjs'; declare function asDiagram(arg: string): Function; @@ -58,16 +58,94 @@ describe('retry operator', () => { retry(retries - 1) ).subscribe( (x: number) => { - expect(x).to.equal(42); + done("shouldn't next"); }, (err: any) => { expect(errors).to.equal(2); done(); }, () => { - expect('this was called').to.be.true; + done("shouldn't complete"); }); }); + it('should retry a number of times, then call error handler (with resetOnSuccess)', (done: MochaDone) => { + let errors = 0; + const retries = 2; + Observable.create((observer: Observer) => { + observer.next(42); + observer.complete(); + }).pipe( + map((x: any) => { + errors += 1; + throw 'bad'; + }), + retry({count: retries - 1, resetOnSuccess: true}) + ).subscribe( + (x: number) => { + done("shouldn't next"); + }, + (err: any) => { + expect(errors).to.equal(2); + done(); + }, () => { + done("shouldn't complete"); + }); + }); + + it('should retry a number of times, then call next handler without error, then retry and complete', (done: MochaDone) => { + let index = 0; + let errors = 0; + const retries = 2; + defer(() => range(0, 4 - index)).pipe( + mergeMap(() => { + index++; + if (index === 1 || index === 3) { + errors++; + return throwError('bad'); + } else { + return of(42); + } + }), + retry({count: retries - 1, resetOnSuccess: true}) + ).subscribe( + (x: number) => { + expect(x).to.equal(42); + }, + (err: any) => { + done("shouldn't error"); + }, () => { + expect(errors).to.equal(retries); + done(); + }); + }); + + it('should retry a number of times, then call next handler without error, then retry and error', (done: MochaDone) => { + let index = 0; + let errors = 0; + const retries = 2; + defer(() => range(0, 4 - index)).pipe( + mergeMap(() => { + index++; + if (index === 1 || index === 3) { + errors++; + return throwError('bad'); + } else { + return of(42); + } + }), + retry({count: retries - 1, resetOnSuccess: false}) + ).subscribe( + (x: number) => { + expect(x).to.equal(42); + }, + (err: any) => { + expect(errors).to.equal(retries); + done(); + }, () => { + done("shouldn't complete"); + }); + }); + it('should retry until successful completion', (done: MochaDone) => { let errors = 0; const retries = 10; diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index a6ce566ebf3..0252ae5dab7 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -4,6 +4,11 @@ import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +export interface RetryConfig { + count: number; + resetOnSuccess?: boolean; +} + /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable * calls `error`, this method will resubscribe to the source Observable for a maximum of `count` resubscriptions (given @@ -46,21 +51,33 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * ``` * * @param {number} count - Number of retry attempts before failing. + * @param {boolean} resetOnSuccess - When set to `true` every successful emission will reset the error count * @return {Observable} The source Observable modified with the retry logic. * @method retry * @owner Observable */ -export function retry(count: number = -1): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new RetryOperator(count, source)); +export function retry(count?: number): MonoTypeOperatorFunction; +export function retry(config: RetryConfig): MonoTypeOperatorFunction; +export function retry(configOrCount: number | RetryConfig = -1): MonoTypeOperatorFunction { + let config: RetryConfig; + if (configOrCount && typeof configOrCount === 'object') { + config = configOrCount as RetryConfig; + } else { + config = { + count: configOrCount as number + }; + } + return (source: Observable) => source.lift(new RetryOperator(config.count, !!config.resetOnSuccess, source)); } class RetryOperator implements Operator { constructor(private count: number, + private resetOnSuccess: boolean, private source: Observable) { } call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source)); + return source.subscribe(new RetrySubscriber(subscriber, this.count, this.resetOnSuccess, this.source)); } } @@ -70,11 +87,24 @@ class RetryOperator implements Operator { * @extends {Ignored} */ class RetrySubscriber extends Subscriber { + private readonly initialCount: number; + constructor(destination: Subscriber, private count: number, - private source: Observable) { + private resetOnSuccess: boolean, + private source: Observable + ) { super(destination); + this.initialCount = this.count; } + + next(value?: T): void { + super.next(value); + if (this.resetOnSuccess) { + this.count = this.initialCount; + } + } + error(err: any) { if (!this.isStopped) { const { source, count } = this;