From b57504252a7f1d351ccb75c3f0ee35849dec753c Mon Sep 17 00:00:00 2001 From: Scott Fleckenstein Date: Mon, 11 Nov 2013 15:01:09 -0800 Subject: [PATCH] Wraps DoOnEach in a SafeObserver This commit leverages the SafeObserver facility to get the desired behavior in the face of exceptions. Specifically, if any of the operations performed within the doOnEach handler raises an exception, that exception will propagate through the observable chain. --- .../java/rx/operators/OperationDoOnEach.java | 16 +++++++------- .../rx/operators/OperationDoOnEachTest.java | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java index acd841fc2f..1b0aafb578 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java @@ -24,23 +24,24 @@ * Converts the elements of an observable sequence to the specified type. */ public class OperationDoOnEach { - public static OnSubscribeFunc doOnEach(Observable source, Observer observer) { - return new DoOnEachObservable(source, observer); + public static OnSubscribeFunc doOnEach(Observable sequence, Observer observer) { + return new DoOnEachObservable(sequence, observer); } private static class DoOnEachObservable implements OnSubscribeFunc { - private final Observable source; + private final Observable sequence; private final Observer doOnEachObserver; - public DoOnEachObservable(Observable source, Observer doOnEachObserver) { - this.source = source; + public DoOnEachObservable(Observable sequence, Observer doOnEachObserver) { + this.sequence = sequence; this.doOnEachObserver = doOnEachObserver; } @Override public Subscription onSubscribe(final Observer observer) { - return source.subscribe(new Observer() { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(sequence.subscribe(new SafeObserver(subscription, new Observer() { @Override public void onCompleted() { doOnEachObserver.onCompleted(); @@ -58,8 +59,7 @@ public void onNext(T value) { doOnEachObserver.onNext(value); observer.onNext(value); } - - }); + }))); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java index 4bf017761c..6c1407ebea 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java @@ -36,6 +36,7 @@ import rx.concurrency.Schedulers; import rx.util.functions.Func1; import rx.util.functions.Func2; +import rx.util.functions.Action1; public class OperationDoOnEachTest { @@ -104,5 +105,25 @@ public String call(String s) { verify(sideEffectObserver, times(1)).onError(any(Throwable.class)); } + @Test + public void testDoOnEachWithErrorInCallback() { + Observable base = Observable.from("one", "two", "fail", "three"); + Observable doOnEach = base.doOnEach(new Action1() { + @Override + public void call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + } + }); + + doOnEach.subscribe(subscribedObserver); + verify(subscribedObserver, times(1)).onNext("one"); + verify(subscribedObserver, times(1)).onNext("two"); + verify(subscribedObserver, never()).onNext("three"); + verify(subscribedObserver, never()).onCompleted(); + verify(subscribedObserver, times(1)).onError(any(Throwable.class)); + + } }