diff --git a/spec/operators/retryWhen-spec.js b/spec/operators/retryWhen-spec.js new file mode 100644 index 0000000000..419431f1ab --- /dev/null +++ b/spec/operators/retryWhen-spec.js @@ -0,0 +1,56 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.retryWhen()', function () { + it('should retry when notified via returned notifier on thrown error', function (done) { + var retried = false; + var expected = [1, 2, 1, 2]; + var i = 0; + Observable.of(1, 2, 3) + .map(function (n) { + if (n === 3) { + throw 'bad'; + } + return n; + }) + .retryWhen(function (errors) { + return errors.map(function (x) { + expect(x).toBe('bad'); + if (retried) { + throw 'done'; + } + retried = true; + return x; + }); + }) + .subscribe(function (x) { + expect(x).toBe(expected[i++]); + }, + function (err) { + expect(err).toBe('done'); + done(); + }) + }); + + it('should retry when notified and complete on returned completion', function (done) { + var expected = [1, 2, 1, 2]; + Observable.of(1, 2, 3) + .map(function (n) { + if (n === 3) { + throw 'bad'; + } + return n; + }) + .retryWhen(function (errors) { + return errors.take(1); + }) + .subscribe(function (n) { + expect(n).toBe(expected.shift()); + }, function (err) { + throw 'error should not be called'; + }, function () { + done(); + }); + }); +}); \ No newline at end of file diff --git a/spec/subject-spec.js b/spec/subject-spec.js index 74fc77576f..4e7072fb4e 100644 --- a/spec/subject-spec.js +++ b/spec/subject-spec.js @@ -70,7 +70,6 @@ describe('Subject', function () { }); it('should clean out unsubscribed subscribers', function (done) { - debugger; var subject = new Subject(); var sub1 = subject.subscribe(function (x) { diff --git a/src/Observable.ts b/src/Observable.ts index 0736996e24..52bb680a2a 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -137,4 +137,5 @@ export default class Observable { multicast: (subjectFactory: () => Subject) => ConnectableObservable; catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; + retryWhen: (notifier: (errors: Observable) => Observable) => Observable; } diff --git a/src/Rx.ts b/src/Rx.ts index f424790ab5..da6e2a3ef6 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -126,8 +126,10 @@ import partition from './operators/partition'; observableProto.partition = partition; import _catch from './operators/catch'; +import retryWhen from './operators/retryWhen'; observableProto.catch = _catch; +observableProto.retryWhen = retryWhen; export default { Subject, diff --git a/src/operators/retryWhen.ts b/src/operators/retryWhen.ts new file mode 100644 index 0000000000..017dd37648 --- /dev/null +++ b/src/operators/retryWhen.ts @@ -0,0 +1,74 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; + +export default function retryWhen(notifier: (errors:Observable) => Observable) { + return this.lift(new RetryWhenOperator(notifier, this)); +} + +export class RetryWhenOperator extends Operator { + constructor(protected notifier: (errors: Observable) => Observable, protected original:Observable) { + super(); + } + + call(observer: Observer): Observer { + return new RetryWhenSubscriber(observer, this.notifier, this.original); + } +} + +export class RetryWhenSubscriber extends Subscriber { + errors: Subject; + retryNotifications: Observable; + retryNotificationSubscription: Subscription; + + constructor(destination: Observer, public notifier: (errors: Observable) => Observable, public original: Observable) { + super(destination); + } + + _error(err: any) { + if (!this.retryNotifications) { + this.errors = new Subject(); + const notifications = tryCatch(this.notifier).call(this, this.errors); + if (notifications === errorObject) { + this.destination.error(errorObject.e); + } else { + this.retryNotifications = notifications; + this.retryNotificationSubscription = notifications.subscribe(new RetryNotificationSubscriber(this)); + this.add(this.retryNotificationSubscription); + } + } + this.errors.next(err); + } + + finalError(err: any) { + this.destination.error(err); + } + + resubscribe() { + this.original.subscribe(this); + } +} + +export class RetryNotificationSubscriber extends Subscriber { + constructor(public parent: RetryWhenSubscriber) { + super(null); + } + + _next(value: T) { + this.parent.resubscribe(); + } + + _error(err: any) { + this.parent.finalError(err); + } + + _complete() { + this.parent.complete(); + } +} \ No newline at end of file