Skip to content

Commit

Permalink
Merge pull request #1271 from akarnokd/OperatorRetryWithPredicate
Browse files Browse the repository at this point in the history
Operator Retry with predicate
  • Loading branch information
benjchristensen committed May 28, 2014
2 parents dd52daf + 1c20ab0 commit 7e8fc57
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 6 deletions.
13 changes: 12 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5389,7 +5389,18 @@ public final Observable<T> retry() {
public final Observable<T> retry(int retryCount) {
return nest().lift(new OperatorRetry<T>(retryCount));
}

/**
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
* and the predicate returns true for that specific exception and retry count.
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry
* count
* @return the Observable modified with retry logic
* @see #retry()
*/
public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
}

/**
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
* within periodic time intervals.
Expand Down
117 changes: 117 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorRetryWithPredicate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import rx.subscriptions.SerialSubscription;

public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
final Func2<Integer, Throwable, Boolean> predicate;
public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
this.predicate = predicate;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
child.add(inner);

final SerialSubscription serialSubscription = new SerialSubscription();
// add serialSubscription so it gets unsubscribed if child is unsubscribed
child.add(serialSubscription);

return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
}

static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
final Subscriber<? super T> child;
final Func2<Integer, Throwable, Boolean> predicate;
final Scheduler.Worker inner;
final SerialSubscription serialSubscription;

volatile int attempts;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");

public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
SerialSubscription serialSubscription) {
this.child = child;
this.predicate = predicate;
this.inner = inner;
this.serialSubscription = serialSubscription;
}


@Override
public void onCompleted() {
// ignore as we expect a single nested Observable<T>
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

@Override
public void call() {
final Action0 _self = this;
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);

// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Subscriber<T> subscriber = new Subscriber<T>() {

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
inner.schedule(_self);
} else {
// give up and pass the failure
child.onError(e);
}
}

@Override
public void onNext(T v) {
child.onNext(v);
}

};
// register this Subscription (and unsubscribe previous if exists)
serialSubscription.set(subscriber);
o.unsafeSubscribe(subscriber);
}
});
}
}
}
11 changes: 6 additions & 5 deletions rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ public void call(Subscriber<? super String> s) {
assertEquals(4, subsCount.get()); // 1 + 3 retries
}

class SlowObservable implements Observable.OnSubscribe<Long> {
static final class SlowObservable implements Observable.OnSubscribe<Long> {

private AtomicInteger efforts = new AtomicInteger(0);
private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
private AtomicInteger nextBeforeFailure;
final AtomicInteger efforts = new AtomicInteger(0);
final AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
final AtomicInteger nextBeforeFailure;

private final int emitDelay;

Expand All @@ -273,6 +273,7 @@ public SlowObservable(int emitDelay, int countNext) {
this.nextBeforeFailure = new AtomicInteger(countNext);
}

@Override
public void call(final Subscriber<? super Long> subscriber) {
final AtomicBoolean terminate = new AtomicBoolean(false);
efforts.getAndIncrement();
Expand Down Expand Up @@ -309,7 +310,7 @@ public void call() {
}

/** Observer for listener on seperate thread */
class AsyncObserver<T> implements Observer<T> {
static final class AsyncObserver<T> implements Observer<T> {

protected CountDownLatch latch = new CountDownLatch(1);

Expand Down
Loading

0 comments on commit 7e8fc57

Please sign in to comment.