diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 7fbf369b79..4324d32acf 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -31,6 +31,7 @@ import rx.functions.Func8; import rx.functions.Func9; import rx.internal.operators.OnSubscribeToObservableFuture; +import rx.internal.operators.OperatorDoOnEach; import rx.internal.operators.OperatorMap; import rx.internal.operators.OperatorObserveOn; import rx.internal.operators.OperatorOnErrorReturn; @@ -1789,4 +1790,40 @@ public final Single zipWith(Single other, Func2 + * In case the onError action throws, the downstream will receive a composite exception containing + * the original exception and the exception thrown by onError. + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onError + * the action to invoke if the source {@link Single} calls {@code onError} + * @return the source {@link Single} with the side-effecting behavior applied + * @see ReactiveX operators documentation: Do + */ + @Experimental + public final Single doOnError(final Action1 onError) { + Observer observer = new Observer() { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onNext(T t) { + } + }; + + return lift(new OperatorDoOnEach(observer)); + } } diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 1efd1ae5a7..7d8fe2dc22 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -13,8 +13,13 @@ package rx; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import java.util.Arrays; import java.util.concurrent.CountDownLatch; @@ -26,7 +31,9 @@ import org.junit.Test; import rx.Single.OnSubscribe; +import rx.exceptions.CompositeException; import rx.functions.Action0; +import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; import rx.observers.TestSubscriber; @@ -461,4 +468,66 @@ public void testToObservable() { ts.assertValue("a"); ts.assertCompleted(); } + + @Test + public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() { + Action1 action = mock(Action1.class); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .just("value") + .doOnError(action) + .subscribe(testSubscriber); + + testSubscriber.assertValue("value"); + testSubscriber.assertNoErrors(); + + verifyZeroInteractions(action); + } + + @Test + public void doOnErrorShouldCallActionIfErrorHasOccurred() { + Action1 action = mock(Action1.class); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Throwable error = new IllegalStateException(); + + Single + .error(error) + .doOnError(action) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(error); + + verify(action).call(error); + } + + @Test + public void doOnErrorShouldThrowCompositeExceptionIfOnErrorActionThrows() { + Action1 action = mock(Action1.class); + + + Throwable error = new RuntimeException(); + Throwable exceptionFromOnErrorAction = new IllegalStateException(); + doThrow(exceptionFromOnErrorAction).when(action).call(error); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .error(error) + .doOnError(action) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + CompositeException compositeException = (CompositeException) testSubscriber.getOnErrorEvents().get(0); + + assertEquals(2, compositeException.getExceptions().size()); + assertSame(error, compositeException.getExceptions().get(0)); + assertSame(exceptionFromOnErrorAction, compositeException.getExceptions().get(1)); + + verify(action).call(error); + } }