diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 00553eebb9..1ba5d1f281 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -211,26 +211,35 @@ public void call() { } Subscriber terminalDelegatingSubscriber = new Subscriber() { + boolean done; @Override public void onCompleted() { - currentProducer.set(null); - unsubscribe(); - terminals.onNext(Notification.createOnCompleted()); + if (!done) { + done = true; + currentProducer.set(null); + unsubscribe(); + terminals.onNext(Notification.createOnCompleted()); + } } @Override public void onError(Throwable e) { - currentProducer.set(null); - unsubscribe(); - terminals.onNext(Notification.createOnError(e)); + if (!done) { + done = true; + currentProducer.set(null); + unsubscribe(); + terminals.onNext(Notification.createOnError(e)); + } } @Override public void onNext(T v) { - if (consumerCapacity.get() != Long.MAX_VALUE) { - consumerCapacity.decrementAndGet(); + if (!done) { + if (consumerCapacity.get() != Long.MAX_VALUE) { + consumerCapacity.decrementAndGet(); + } + child.onNext(v); } - child.onNext(v); } @Override diff --git a/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java b/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java index 92eb34ca0f..24beeec2a0 100644 --- a/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java +++ b/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java @@ -84,26 +84,34 @@ public void call() { // new subscription each time so if it unsubscribes itself it does not prevent retries // by unsubscribing the child subscription Subscriber subscriber = new Subscriber() { - + boolean done; @Override public void onCompleted() { - child.onCompleted(); + if (!done) { + done = true; + child.onCompleted(); + } } @Override public void onError(Throwable e) { - if (predicate.call(attempts, e) && !inner.isUnsubscribed()) { - // retry again - inner.schedule(_self); - } else { - // give up and pass the failure - child.onError(e); + if (!done) { + done = true; + if (predicate.call(attempts, e) && !inner.isUnsubscribed()) { + // retry again + inner.schedule(_self); + } else { + // give up and pass the failure + child.onError(e); + } } } @Override public void onNext(T v) { - child.onNext(v); + if (!done) { + child.onNext(v); + } } }; diff --git a/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java b/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java index 008085dd01..ee4750829a 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java @@ -15,21 +15,23 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + import java.io.IOException; +import java.util.Collections; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; +import java.util.concurrent.atomic.*; + import org.junit.Test; import org.mockito.InOrder; -import static org.mockito.Mockito.*; -import rx.Observable; + +import rx.*; import rx.Observable.OnSubscribe; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; import rx.exceptions.TestException; -import rx.functions.Action1; -import rx.functions.Func2; +import rx.functions.*; +import rx.observers.TestSubscriber; import rx.subjects.PublishSubject; public class OperatorRetryWithPredicateTest { @@ -270,4 +272,37 @@ public void testTimeoutWithRetry() { assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get()); } + + @Test + public void testIssue2826() { + TestSubscriber ts = new TestSubscriber(); + final RuntimeException e = new RuntimeException("You shall not pass"); + final AtomicInteger c = new AtomicInteger(); + Observable.just(1).map(new Func1() { + @Override + public Integer call(Integer t1) { + c.incrementAndGet(); + throw e; + } + }).retry(retry5).subscribe(ts); + + ts.assertTerminalEvent(); + assertEquals(6, c.get()); + assertEquals(Collections.singletonList(e), ts.getOnErrorEvents()); + } + @Test + public void testJustAndRetry() throws Exception { + final AtomicBoolean throwException = new AtomicBoolean(true); + int value = Observable.just(1).map(new Func1() { + @Override + public Integer call(Integer t1) { + if (throwException.compareAndSet(true, false)) { + throw new TestException(); + } + return t1; + } + }).retry(1).toBlocking().single(); + + assertEquals(1, value); + } }