Skip to content

Commit

Permalink
Merge pull request #3419 from artem-zinnatullin/single-do-on-error
Browse files Browse the repository at this point in the history
Add Single.doOnError()
  • Loading branch information
abersnaze committed Oct 8, 2015
2 parents 9204154 + 2d832a4 commit 61e1c22
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import rx.functions.Func8;
import rx.functions.Func9;
import rx.internal.operators.OnSubscribeToObservableFuture;
import rx.internal.operators.OperatorDoOnEach;
import rx.internal.operators.OperatorMap;
import rx.internal.operators.OperatorObserveOn;
import rx.internal.operators.OperatorOnErrorReturn;
Expand Down Expand Up @@ -1789,4 +1790,40 @@ public final <T2, R> Single<R> zipWith(Single<? extends T2> other, Func2<? super
return zip(this, other, zipFunction);
}

/**
* Modifies the source {@link Single} so that it invokes an action if it calls {@code onError}.
* <p>
* In case the onError action throws, the downstream will receive a composite exception containing
* the original exception and the exception thrown by onError.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onError
* the action to invoke if the source {@link Single} calls {@code onError}
* @return the source {@link Single} with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@Experimental
public final Single<T> doOnError(final Action1<Throwable> onError) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onNext(T t) {
}
};

return lift(new OperatorDoOnEach<T>(observer));
}
}
69 changes: 69 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
package rx;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
Expand All @@ -26,7 +31,9 @@
import org.junit.Test;

import rx.Single.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -461,4 +468,66 @@ public void testToObservable() {
ts.assertValue("a");
ts.assertCompleted();
}

@Test
public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() {
Action1<Throwable> action = mock(Action1.class);

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("value")
.doOnError(action)
.subscribe(testSubscriber);

testSubscriber.assertValue("value");
testSubscriber.assertNoErrors();

verifyZeroInteractions(action);
}

@Test
public void doOnErrorShouldCallActionIfErrorHasOccurred() {
Action1<Throwable> action = mock(Action1.class);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Throwable error = new IllegalStateException();

Single
.error(error)
.doOnError(action)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verify(action).call(error);
}

@Test
public void doOnErrorShouldThrowCompositeExceptionIfOnErrorActionThrows() {
Action1<Throwable> action = mock(Action1.class);


Throwable error = new RuntimeException();
Throwable exceptionFromOnErrorAction = new IllegalStateException();
doThrow(exceptionFromOnErrorAction).when(action).call(error);

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.error(error)
.doOnError(action)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
CompositeException compositeException = (CompositeException) testSubscriber.getOnErrorEvents().get(0);

assertEquals(2, compositeException.getExceptions().size());
assertSame(error, compositeException.getExceptions().get(0));
assertSame(exceptionFromOnErrorAction, compositeException.getExceptions().get(1));

verify(action).call(error);
}
}

0 comments on commit 61e1c22

Please sign in to comment.