From d000c1035b5a4134b12c7ef5198aa8487451e54e Mon Sep 17 00:00:00 2001
From: akarnokd <akarnokd@akarnokd-PC2>
Date: Mon, 3 Aug 2015 21:26:22 +0200
Subject: [PATCH] Fix retry with predicate ignoring backpressure.

---
 src/main/java/rx/Observable.java              |   2 +
 .../operators/OperatorRetryWithPredicate.java | 116 ++++++++++--------
 .../OperatorRetryWithPredicateTest.java       |  41 ++++++-
 3 files changed, 105 insertions(+), 54 deletions(-)

diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java
index 0aafecbf79..a6432a89a0 100644
--- a/src/main/java/rx/Observable.java
+++ b/src/main/java/rx/Observable.java
@@ -6576,6 +6576,8 @@ public final Observable<T> retry(final long count) {
      * <p>
      * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
      * <dl>
+     *  <dt><b>Backpressure Support:</b></dt>
+     *  <dd>This operator honors backpressure.</td>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
      * </dl>
diff --git a/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java b/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java
index 24beeec2a0..bdfcd3dbeb 100644
--- a/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java
+++ b/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java
@@ -16,11 +16,14 @@
 package rx.internal.operators;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 import rx.Observable;
+import rx.Producer;
 import rx.Scheduler;
 import rx.Subscriber;
 import rx.functions.Action0;
 import rx.functions.Func2;
+import rx.internal.producers.ProducerArbiter;
 import rx.schedulers.Schedulers;
 import rx.subscriptions.SerialSubscription;
 
@@ -38,8 +41,9 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child)
         final SerialSubscription serialSubscription = new SerialSubscription();
         // add serialSubscription so it gets unsubscribed if child is unsubscribed
         child.add(serialSubscription);
-        
-        return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
+        ProducerArbiter pa = new ProducerArbiter();
+        child.setProducer(pa);
+        return new SourceSubscriber<T>(child, predicate, inner, serialSubscription, pa);
     }
     
     static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
@@ -47,79 +51,89 @@ static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
         final Func2<Integer, Throwable, Boolean> predicate;
         final Scheduler.Worker inner;
         final SerialSubscription serialSubscription;
+        final ProducerArbiter pa;
         
         volatile int attempts;
         @SuppressWarnings("rawtypes")
         static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
                 = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
 
-        public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner, 
-                SerialSubscription serialSubscription) {
+        public SourceSubscriber(Subscriber<? super T> child, 
+                final Func2<Integer, Throwable, Boolean> predicate, 
+                Scheduler.Worker inner, 
+                SerialSubscription serialSubscription,
+                ProducerArbiter pa) {
             this.child = child;
             this.predicate = predicate;
             this.inner = inner;
             this.serialSubscription = serialSubscription;
+            this.pa = pa;
         }
         
         
         @Override
-            public void onCompleted() {
-                // ignore as we expect a single nested Observable<T>
-            }
+        public void onCompleted() {
+            // ignore as we expect a single nested Observable<T>
+        }
 
-            @Override
-            public void onError(Throwable e) {
-                child.onError(e);
-            }
+        @Override
+        public void onError(Throwable e) {
+            child.onError(e);
+        }
 
-            @Override
-            public void onNext(final Observable<T> o) {
-                inner.schedule(new Action0() {
+        @Override
+        public void onNext(final Observable<T> o) {
+            inner.schedule(new Action0() {
 
-                    @Override
-                    public void call() {
-                        final Action0 _self = this;
-                        ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
+                @Override
+                public void call() {
+                    final Action0 _self = this;
+                    ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
 
-                        // new subscription each time so if it unsubscribes itself it does not prevent retries
-                        // by unsubscribing the child subscription
-                        Subscriber<T> subscriber = new Subscriber<T>() {
-                            boolean done;
-                            @Override
-                            public void onCompleted() {
-                                if (!done) {
-                                    done = true;
-                                    child.onCompleted();
-                                }
+                    // new subscription each time so if it unsubscribes itself it does not prevent retries
+                    // by unsubscribing the child subscription
+                    Subscriber<T> subscriber = new Subscriber<T>() {
+                        boolean done;
+                        @Override
+                        public void onCompleted() {
+                            if (!done) {
+                                done = true;
+                                child.onCompleted();
                             }
+                        }
 
-                            @Override
-                            public void onError(Throwable e) {
-                                if (!done) {
-                                    done = true;
-                                    if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
-                                        // retry again
-                                        inner.schedule(_self);
-                                    } else {
-                                        // give up and pass the failure
-                                        child.onError(e);
-                                    }
+                        @Override
+                        public void onError(Throwable e) {
+                            if (!done) {
+                                done = true;
+                                if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
+                                    // retry again
+                                    inner.schedule(_self);
+                                } else {
+                                    // give up and pass the failure
+                                    child.onError(e);
                                 }
                             }
+                        }
 
-                            @Override
-                            public void onNext(T v) {
-                                if (!done) {
-                                    child.onNext(v);
-                                }
+                        @Override
+                        public void onNext(T v) {
+                            if (!done) {
+                                child.onNext(v);
+                                pa.produced(1);
                             }
+                        }
 
-                        };
-                        // register this Subscription (and unsubscribe previous if exists) 
-                        serialSubscription.set(subscriber);
-                        o.unsafeSubscribe(subscriber);
-                    }
-                });
-            }
+                        @Override
+                        public void setProducer(Producer p) {
+                            pa.setProducer(p);
+                        }
+                    };
+                    // register this Subscription (and unsubscribe previous if exists) 
+                    serialSubscription.set(subscriber);
+                    o.unsafeSubscribe(subscriber);
+                }
+            });
+        }
     }
 }
diff --git a/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java b/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java
index 76461e3ddf..df878de13a 100644
--- a/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java
+++ b/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java
@@ -20,20 +20,27 @@
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Test;
 import org.mockito.InOrder;
 
-import rx.*;
+import rx.Observable;
 import rx.Observable.OnSubscribe;
+import rx.Observer;
+import rx.Subscriber;
+import rx.Subscription;
 import rx.exceptions.TestException;
-import rx.functions.*;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func2;
 import rx.observers.TestSubscriber;
 import rx.subjects.PublishSubject;
 
@@ -360,4 +367,32 @@ public void call(Long t) {
             }});
         assertEquals(Arrays.asList(1L,1L,2L,3L), list);
     }
+    @Test
+    public void testBackpressure() {
+        final List<Long> requests = new ArrayList<Long>();
+        
+        Observable<Integer> source = Observable
+                .just(1)
+                .concatWith(Observable.<Integer>error(new TestException()))
+                .doOnRequest(new Action1<Long>() {
+                    @Override
+                    public void call(Long t) {
+                        requests.add(t);
+                    }
+                });
+        
+        TestSubscriber<Integer> ts = TestSubscriber.create(3);
+        source
+        .retry(new Func2<Integer, Throwable, Boolean>() {
+            @Override
+            public Boolean call(Integer t1, Throwable t2) {
+                return t1 < 3;
+            }
+        }).subscribe(ts);
+        
+        assertEquals(Arrays.asList(3L, 2L, 1L), requests);
+        ts.assertValues(1, 1, 1);
+        ts.assertNotCompleted();
+        ts.assertNoErrors();
+    }
 }