Skip to content

Commit

Permalink
Add Single.doOnError()
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-zinnatullin committed Oct 8, 2015
1 parent 29ce486 commit 3f6e2db
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import rx.functions.Func8;
import rx.functions.Func9;
import rx.internal.operators.OnSubscribeToObservableFuture;
import rx.internal.operators.OperatorDoOnEach;
import rx.internal.operators.OperatorMap;
import rx.internal.operators.OperatorObserveOn;
import rx.internal.operators.OperatorOnErrorReturn;
Expand Down Expand Up @@ -1789,4 +1790,39 @@ public final <T2, R> Single<R> zipWith(Single<? extends T2> other, Func2<? super
return zip(this, other, zipFunction);
}

/**
* Modifies the source {@link Single} so that it invokes an action if it calls {@code onError}.
* <p>
* In case the onError action throws, the downstream will receive a composite exception containing
* the original exception and the exception thrown by onError.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onError
* the action to invoke if the source {@link Single} calls {@code onError}
* @return the source {@link Single} with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
public final Single<T> doOnError(final Action1<Throwable> onError) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onNext(T t) {
}
};

return lift(new OperatorDoOnEach<T>(observer));
}
}
40 changes: 40 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
Expand All @@ -27,6 +30,7 @@

import rx.Single.OnSubscribe;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -461,4 +465,40 @@ public void testToObservable() {
ts.assertValue("a");
ts.assertCompleted();
}

@Test
public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() {
Action1<Throwable> action = mock(Action1.class);

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("value")
.doOnError(action)
.subscribe(testSubscriber);

testSubscriber.assertValue("value");
testSubscriber.assertNoErrors();

verifyZeroInteractions(action);
}

@Test
public void doOnErrorShouldCallActionIfErrorHasOccurred() {
Action1<Throwable> action = mock(Action1.class);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Throwable error = new IllegalStateException();

Single
.error(error)
.doOnError(action)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verify(action).call(error);
}
}

0 comments on commit 3f6e2db

Please sign in to comment.