diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 7fbf369b799..1eed7dae828 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -31,6 +31,7 @@ import rx.functions.Func8; import rx.functions.Func9; import rx.internal.operators.OnSubscribeToObservableFuture; +import rx.internal.operators.OperatorDoOnEach; import rx.internal.operators.OperatorMap; import rx.internal.operators.OperatorObserveOn; import rx.internal.operators.OperatorOnErrorReturn; @@ -1789,4 +1790,39 @@ public final Single zipWith(Single other, Func2 + * In case the onError action throws, the downstream will receive a composite exception containing + * the original exception and the exception thrown by onError. + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onError + * the action to invoke if the source {@link Single} calls {@code onError} + * @return the source {@link Single} with the side-effecting behavior applied + * @see ReactiveX operators documentation: Do + */ + public final Single doOnError(final Action1 onError) { + Observer observer = new Observer() { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onNext(T t) { + } + }; + + return lift(new OperatorDoOnEach(observer)); + } } diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 1efd1ae5a79..7c3a17ba611 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -15,6 +15,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import java.util.Arrays; import java.util.concurrent.CountDownLatch; @@ -27,6 +30,7 @@ import rx.Single.OnSubscribe; import rx.functions.Action0; +import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; import rx.observers.TestSubscriber; @@ -461,4 +465,40 @@ public void testToObservable() { ts.assertValue("a"); ts.assertCompleted(); } + + @Test + public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() { + Action1 action = mock(Action1.class); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .just("value") + .doOnError(action) + .subscribe(testSubscriber); + + testSubscriber.assertValue("value"); + testSubscriber.assertNoErrors(); + + verifyZeroInteractions(action); + } + + @Test + public void doOnErrorShouldCallActionIfErrorHasOccurred() { + Action1 action = mock(Action1.class); + + TestSubscriber testSubscriber = new TestSubscriber(); + + Throwable error = new IllegalStateException(); + + Single + .error(error) + .doOnError(action) + .subscribe(testSubscriber); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(error); + + verify(action).call(error); + } }