Skip to content

Commit

Permalink
Wraps DoOnEach in a SafeObserver
Browse files Browse the repository at this point in the history
This commit leverages the SafeObserver facility to get the desired
behavior in the face of exceptions.  Specifically, if any of the
operations performed within the doOnEach handler raises an exception,
that exception will propagate through the observable chain.
  • Loading branch information
nullstyle committed Nov 11, 2013
1 parent b39d032 commit b575042
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
16 changes: 8 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,24 @@
* Converts the elements of an observable sequence to the specified type.
*/
public class OperationDoOnEach {
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> source, Observer<? super T> observer) {
return new DoOnEachObservable<T>(source, observer);
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> sequence, Observer<? super T> observer) {
return new DoOnEachObservable<T>(sequence, observer);
}

private static class DoOnEachObservable<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> source;
private final Observable<? extends T> sequence;
private final Observer<? super T> doOnEachObserver;

public DoOnEachObservable(Observable<? extends T> source, Observer<? super T> doOnEachObserver) {
this.source = source;
public DoOnEachObservable(Observable<? extends T> sequence, Observer<? super T> doOnEachObserver) {
this.sequence = sequence;
this.doOnEachObserver = doOnEachObserver;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
return source.subscribe(new Observer<T>() {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
@Override
public void onCompleted() {
doOnEachObserver.onCompleted();
Expand All @@ -58,8 +59,7 @@ public void onNext(T value) {
doOnEachObserver.onNext(value);
observer.onNext(value);
}

});
})));
}

}
Expand Down
21 changes: 21 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import rx.concurrency.Schedulers;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Action1;

public class OperationDoOnEachTest {

Expand Down Expand Up @@ -104,5 +105,25 @@ public String call(String s) {
verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
}

@Test
public void testDoOnEachWithErrorInCallback() {
Observable<String> base = Observable.from("one", "two", "fail", "three");
Observable<String> doOnEach = base.doOnEach(new Action1<String>() {
@Override
public void call(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
}
});

doOnEach.subscribe(subscribedObserver);
verify(subscribedObserver, times(1)).onNext("one");
verify(subscribedObserver, times(1)).onNext("two");
verify(subscribedObserver, never()).onNext("three");
verify(subscribedObserver, never()).onCompleted();
verify(subscribedObserver, times(1)).onError(any(Throwable.class));

}

}

0 comments on commit b575042

Please sign in to comment.