diff --git a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java index acd841fc2f..1b0aafb578 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java @@ -24,23 +24,24 @@ * Converts the elements of an observable sequence to the specified type. */ public class OperationDoOnEach { - public static OnSubscribeFunc doOnEach(Observable source, Observer observer) { - return new DoOnEachObservable(source, observer); + public static OnSubscribeFunc doOnEach(Observable sequence, Observer observer) { + return new DoOnEachObservable(sequence, observer); } private static class DoOnEachObservable implements OnSubscribeFunc { - private final Observable source; + private final Observable sequence; private final Observer doOnEachObserver; - public DoOnEachObservable(Observable source, Observer doOnEachObserver) { - this.source = source; + public DoOnEachObservable(Observable sequence, Observer doOnEachObserver) { + this.sequence = sequence; this.doOnEachObserver = doOnEachObserver; } @Override public Subscription onSubscribe(final Observer observer) { - return source.subscribe(new Observer() { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(sequence.subscribe(new SafeObserver(subscription, new Observer() { @Override public void onCompleted() { doOnEachObserver.onCompleted(); @@ -58,8 +59,7 @@ public void onNext(T value) { doOnEachObserver.onNext(value); observer.onNext(value); } - - }); + }))); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java index 4bf017761c..6c1407ebea 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java @@ -36,6 +36,7 @@ import rx.concurrency.Schedulers; import rx.util.functions.Func1; import rx.util.functions.Func2; +import rx.util.functions.Action1; public class OperationDoOnEachTest { @@ -104,5 +105,25 @@ public String call(String s) { verify(sideEffectObserver, times(1)).onError(any(Throwable.class)); } + @Test + public void testDoOnEachWithErrorInCallback() { + Observable base = Observable.from("one", "two", "fail", "three"); + Observable doOnEach = base.doOnEach(new Action1() { + @Override + public void call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + } + }); + + doOnEach.subscribe(subscribedObserver); + verify(subscribedObserver, times(1)).onNext("one"); + verify(subscribedObserver, times(1)).onNext("two"); + verify(subscribedObserver, never()).onNext("three"); + verify(subscribedObserver, never()).onCompleted(); + verify(subscribedObserver, times(1)).onError(any(Throwable.class)); + + } }