Skip to content

Commit

Permalink
Merge pull request #3129 from akarnokd/RetryPredicateFix
Browse files Browse the repository at this point in the history
Fix retry with predicate ignoring backpressure.
  • Loading branch information
akarnokd committed Aug 6, 2015
2 parents a3a0b1f + d000c10 commit d31d46a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 54 deletions.
2 changes: 2 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6576,6 +6576,8 @@ public final Observable<T> retry(final long count) {
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator honors backpressure.</td>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
Expand Down
116 changes: 65 additions & 51 deletions src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Observable;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func2;
import rx.internal.producers.ProducerArbiter;
import rx.schedulers.Schedulers;
import rx.subscriptions.SerialSubscription;

Expand All @@ -38,88 +41,99 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child)
final SerialSubscription serialSubscription = new SerialSubscription();
// add serialSubscription so it gets unsubscribed if child is unsubscribed
child.add(serialSubscription);

return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
ProducerArbiter pa = new ProducerArbiter();
child.setProducer(pa);
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription, pa);
}

static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
final Subscriber<? super T> child;
final Func2<Integer, Throwable, Boolean> predicate;
final Scheduler.Worker inner;
final SerialSubscription serialSubscription;
final ProducerArbiter pa;

volatile int attempts;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");

public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
SerialSubscription serialSubscription) {
public SourceSubscriber(Subscriber<? super T> child,
final Func2<Integer, Throwable, Boolean> predicate,
Scheduler.Worker inner,
SerialSubscription serialSubscription,
ProducerArbiter pa) {
this.child = child;
this.predicate = predicate;
this.inner = inner;
this.serialSubscription = serialSubscription;
this.pa = pa;
}


@Override
public void onCompleted() {
// ignore as we expect a single nested Observable<T>
}
public void onCompleted() {
// ignore as we expect a single nested Observable<T>
}

@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

@Override
public void call() {
final Action0 _self = this;
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
@Override
public void call() {
final Action0 _self = this;
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);

// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Subscriber<T> subscriber = new Subscriber<T>() {
boolean done;
@Override
public void onCompleted() {
if (!done) {
done = true;
child.onCompleted();
}
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Subscriber<T> subscriber = new Subscriber<T>() {
boolean done;
@Override
public void onCompleted() {
if (!done) {
done = true;
child.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (!done) {
done = true;
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
inner.schedule(_self);
} else {
// give up and pass the failure
child.onError(e);
}
@Override
public void onError(Throwable e) {
if (!done) {
done = true;
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
inner.schedule(_self);
} else {
// give up and pass the failure
child.onError(e);
}
}
}

@Override
public void onNext(T v) {
if (!done) {
child.onNext(v);
}
@Override
public void onNext(T v) {
if (!done) {
child.onNext(v);
pa.produced(1);
}
}

};
// register this Subscription (and unsubscribe previous if exists)
serialSubscription.set(subscriber);
o.unsafeSubscribe(subscriber);
}
});
}
@Override
public void setProducer(Producer p) {
pa.setProducer(p);
}
};
// register this Subscription (and unsubscribe previous if exists)
serialSubscription.set(subscriber);
o.unsafeSubscribe(subscriber);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;

import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.*;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -360,4 +367,32 @@ public void call(Long t) {
}});
assertEquals(Arrays.asList(1L,1L,2L,3L), list);
}
@Test
public void testBackpressure() {
final List<Long> requests = new ArrayList<Long>();

Observable<Integer> source = Observable
.just(1)
.concatWith(Observable.<Integer>error(new TestException()))
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long t) {
requests.add(t);
}
});

TestSubscriber<Integer> ts = TestSubscriber.create(3);
source
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer t1, Throwable t2) {
return t1 < 3;
}
}).subscribe(ts);

assertEquals(Arrays.asList(3L, 2L, 1L), requests);
ts.assertValues(1, 1, 1);
ts.assertNotCompleted();
ts.assertNoErrors();
}
}

0 comments on commit d31d46a

Please sign in to comment.