diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index f45efabc92..1bad2d36e5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -123,7 +123,15 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (predicate.call(args, counter.getAndIncrement())) { + Boolean isSelected; + try { + isSelected = predicate.call(args, counter.getAndIncrement()); + } + catch (Exception e) { + observer.onError(e); + return; + } + if (isSelected) { observer.onNext(args); } else { observer.onCompleted(); @@ -238,6 +246,35 @@ public Boolean call(String s) })).last(); } + @Test + public void testTakeWhileProtectsPredicateCall() { + TestObservable source = new TestObservable(mock(Subscription.class), "one"); + final RuntimeException testException = new RuntimeException("test exception"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable take = Observable.create(takeWhile(source, new Func1() + { + @Override + public Boolean call(String s) + { + throw testException; + } + })); + take.subscribe(aObserver); + + // wait for the Observable to complete + try { + source.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, times(1)).onError(testException); + } + @Test public void testUnsubscribeAfterTake() { Subscription s = mock(Subscription.class);