Skip to content

Commit

Permalink
Merge pull request #2852 from akarnokd/RetryOldSubscribeFix
Browse files Browse the repository at this point in the history
Change retryWhen to eagerly ignore an error'd source's subsequent events
  • Loading branch information
benjchristensen committed Apr 2, 2015
2 parents a6a6a33 + 0a6e26d commit 9f2fc67
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 27 deletions.
27 changes: 18 additions & 9 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,26 +211,35 @@ public void call() {
}

Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
boolean done;
@Override
public void onCompleted() {
currentProducer.set(null);
unsubscribe();
terminals.onNext(Notification.createOnCompleted());
if (!done) {
done = true;
currentProducer.set(null);
unsubscribe();
terminals.onNext(Notification.createOnCompleted());
}
}

@Override
public void onError(Throwable e) {
currentProducer.set(null);
unsubscribe();
terminals.onNext(Notification.createOnError(e));
if (!done) {
done = true;
currentProducer.set(null);
unsubscribe();
terminals.onNext(Notification.createOnError(e));
}
}

@Override
public void onNext(T v) {
if (consumerCapacity.get() != Long.MAX_VALUE) {
consumerCapacity.decrementAndGet();
if (!done) {
if (consumerCapacity.get() != Long.MAX_VALUE) {
consumerCapacity.decrementAndGet();
}
child.onNext(v);
}
child.onNext(v);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,34 @@ public void call() {
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Subscriber<T> subscriber = new Subscriber<T>() {

boolean done;
@Override
public void onCompleted() {
child.onCompleted();
if (!done) {
done = true;
child.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
inner.schedule(_self);
} else {
// give up and pass the failure
child.onError(e);
if (!done) {
done = true;
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
inner.schedule(_self);
} else {
// give up and pass the failure
child.onError(e);
}
}
}

@Override
public void onNext(T v) {
child.onNext(v);
if (!done) {
child.onNext(v);
}
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.*;

import org.junit.Test;
import org.mockito.InOrder;
import static org.mockito.Mockito.*;
import rx.Observable;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action1;
import rx.functions.Func2;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

public class OperatorRetryWithPredicateTest {
Expand Down Expand Up @@ -270,4 +272,37 @@ public void testTimeoutWithRetry() {

assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
}

@Test
public void testIssue2826() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
final RuntimeException e = new RuntimeException("You shall not pass");
final AtomicInteger c = new AtomicInteger();
Observable.just(1).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t1) {
c.incrementAndGet();
throw e;
}
}).retry(retry5).subscribe(ts);

ts.assertTerminalEvent();
assertEquals(6, c.get());
assertEquals(Collections.singletonList(e), ts.getOnErrorEvents());
}
@Test
public void testJustAndRetry() throws Exception {
final AtomicBoolean throwException = new AtomicBoolean(true);
int value = Observable.just(1).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t1) {
if (throwException.compareAndSet(true, false)) {
throw new TestException();
}
return t1;
}
}).retry(1).toBlocking().single();

assertEquals(1, value);
}
}

0 comments on commit 9f2fc67

Please sign in to comment.