diff --git a/rxjava-core/src/main/java/rx/Notification.java b/rxjava-core/src/main/java/rx/Notification.java index 71f2a3a63c5..f1dbef8ad94 100644 --- a/rxjava-core/src/main/java/rx/Notification.java +++ b/rxjava-core/src/main/java/rx/Notification.java @@ -23,7 +23,7 @@ public class Notification { private final Kind kind; - private final Exception exception; + private final Throwable exception; private final T value; /** @@ -44,7 +44,7 @@ public Notification(T value) { * @param exception * The exception passed to the onError notification. */ - public Notification(Exception exception) { + public Notification(Throwable exception) { this.exception = exception; this.value = null; this.kind = Kind.OnError; @@ -62,9 +62,9 @@ public Notification() { /** * Retrieves the exception associated with an onError notification. * - * @return The exception associated with an onError notification. + * @return Throwable associated with an onError notification. */ - public Exception getException() { + public Throwable getThrowable() { return exception; } @@ -126,7 +126,7 @@ public String toString() { if (hasValue()) str.append(" ").append(getValue()); if (hasException()) - str.append(" ").append(getException().getMessage()); + str.append(" ").append(getThrowable().getMessage()); str.append("]"); return str.toString(); } @@ -137,7 +137,7 @@ public int hashCode() { if (hasValue()) hash = hash * 31 + getValue().hashCode(); if (hasException()) - hash = hash * 31 + getException().hashCode(); + hash = hash * 31 + getThrowable().hashCode(); return hash; } @@ -154,7 +154,7 @@ public boolean equals(Object obj) { return false; if (hasValue() && !getValue().equals(notification.getValue())) return false; - if (hasException() && !getException().equals(notification.getException())) + if (hasException() && !getThrowable().equals(notification.getThrowable())) return false; return true; } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index aac823a5cbe..876ff5de65c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -205,14 +205,14 @@ public Subscription subscribe(Observer observer) { } catch (OnErrorNotImplementedException e) { // special handling when onError is not implemented ... we just rethrow throw e; - } catch (Exception e) { + } catch (Throwable e) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { observer.onError(hook.onSubscribeError(this, e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; - } catch (Exception e2) { + } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); @@ -295,7 +295,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); Object onError = callbacks.get("onError"); if (onError != null) { @@ -344,7 +344,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); throw new OnErrorNotImplementedException(e); } @@ -379,7 +379,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); throw new OnErrorNotImplementedException(e); } @@ -421,7 +421,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); Functions.from(onError).call(e); } @@ -438,7 +438,7 @@ public Subscription subscribe(final Object onNext, final Object onError, Schedul return subscribeOn(scheduler).subscribe(onNext, onError); } - public Subscription subscribe(final Action1 onNext, final Action1 onError) { + public Subscription subscribe(final Action1 onNext, final Action1 onError) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } @@ -459,7 +459,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); onError.call(e); } @@ -472,7 +472,7 @@ public void onNext(T args) { }); } - public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { + public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } @@ -504,7 +504,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); Functions.from(onError).call(e); } @@ -521,7 +521,7 @@ public Subscription subscribe(final Object onNext, final Object onError, final O return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } - public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { + public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } @@ -545,7 +545,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { handleError(e); onError.call(e); } @@ -558,7 +558,7 @@ public void onNext(T args) { }); } - public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { + public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } @@ -583,7 +583,7 @@ public ConnectableObservable multicast(Subject subject) { * * @param e */ - private void handleError(Exception e) { + private void handleError(Throwable e) { // onError should be rare so we'll only fetch when needed RxJavaPlugins.getInstance().getErrorHandler().handleError(e); } @@ -618,7 +618,7 @@ public Subscription call(Observer t1) { */ private static class ThrowObservable extends Observable { - public ThrowObservable(final Exception exception) { + public ThrowObservable(final Throwable exception) { super(new Func1, Subscription>() { /** @@ -956,7 +956,7 @@ public static Observable empty() { * @return an Observable that invokes the {@link Observer}'s * {@link Observer#onError onError} method when the Observer subscribes to it */ - public static Observable error(Exception exception) { + public static Observable error(Throwable exception) { return new ThrowObservable(exception); } @@ -1767,7 +1767,7 @@ public static Observable never() { * encounters an error * @return an Observable, identical to the source Observable with its behavior modified as described */ - public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { + public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); } @@ -1800,11 +1800,11 @@ public static Observable onErrorResumeNext(final Observable that, fina public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(resumeFunction); - return onErrorResumeNext(that, new Func1>() { + return onErrorResumeNext(that, new Func1>() { @SuppressWarnings("unchecked") @Override - public Observable call(Exception e) { + public Observable call(Throwable e) { return (Observable) _f.call(e); } }); @@ -1866,7 +1866,7 @@ public static Observable onErrorResumeNext(final Observable that, fina * would otherwise cause it to invoke {@link Observer#onError onError} * @return an Observable, identical to the source Observable with its behavior modified as described */ - public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { + public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); } @@ -3537,7 +3537,7 @@ public Observable observeOn(Scheduler scheduler) { * @return an Observable that emits the items and notifications embedded in the * {@link Notification} objects emitted by the source Observable * @see MSDN: Observable.dematerialize - * @throws Exception + * @throws Throwable * if the source Observable is not of type {@code Observable>}. */ @SuppressWarnings("unchecked") @@ -3571,7 +3571,7 @@ public Observable dematerialize() { * encounters an error * @return the original Observable, with appropriately modified behavior */ - public Observable onErrorResumeNext(final Func1> resumeFunction) { + public Observable onErrorResumeNext(final Func1> resumeFunction) { return onErrorResumeNext(this, resumeFunction); } @@ -3601,11 +3601,11 @@ public Observable onErrorResumeNext(final Func1> res public Observable onErrorResumeNext(final Object resumeFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(resumeFunction); - return onErrorResumeNext(this, new Func1>() { + return onErrorResumeNext(this, new Func1>() { @Override @SuppressWarnings("unchecked") - public Observable call(Exception e) { + public Observable call(Throwable e) { return (Observable) _f.call(e); } }); @@ -3664,7 +3664,7 @@ public Observable onErrorResumeNext(final Observable resumeSequence) { * Observable encounters an error * @return the original Observable with appropriately modified behavior */ - public Observable onErrorReturn(Func1 resumeFunction) { + public Observable onErrorReturn(Func1 resumeFunction) { return onErrorReturn(this, resumeFunction); } @@ -3694,11 +3694,11 @@ public Observable onErrorReturn(Func1 resumeFunction) { public Observable onErrorReturn(final Object resumeFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(resumeFunction); - return onErrorReturn(this, new Func1() { + return onErrorReturn(this, new Func1() { @Override @SuppressWarnings("unchecked") - public T call(Exception e) { + public T call(Throwable e) { return (T) _f.call(e); } }); @@ -4376,7 +4376,7 @@ public Subscription call(Observer Observer) { verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -4453,7 +4453,7 @@ public void testMaterializeDematerializeChaining() { verify(observer, times(1)).onNext(1); verify(observer, times(1)).onCompleted(); - verify(observer, times(0)).onError(any(Exception.class)); + verify(observer, times(0)).onError(any(Throwable.class)); } /** @@ -4467,7 +4467,7 @@ public void testMaterializeDematerializeChaining() { public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override @@ -4499,7 +4499,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { error.set(e); System.out.println("error"); e.printStackTrace(); @@ -4533,7 +4533,7 @@ public void onNext(String v) { @Test public void testCustomObservableWithErrorInObserverSynchronous() { final AtomicInteger count = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override @@ -4553,7 +4553,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { error.set(e); System.out.println("error"); e.printStackTrace(); @@ -4584,7 +4584,7 @@ public void onNext(String v) { @Test public void testCustomObservableWithErrorInObservableSynchronous() { final AtomicInteger count = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override @@ -4601,7 +4601,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { error.set(e); System.out.println("error"); e.printStackTrace(); @@ -4803,7 +4803,7 @@ public void call(Object t1) { }); fail("expected exception"); - } catch (Exception e) { + } catch (Throwable e) { assertEquals("failure", e.getMessage()); } } @@ -4822,7 +4822,7 @@ public void call(Object t1) { @Test public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference exception = new AtomicReference(); + final AtomicReference exception = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override @@ -4833,7 +4833,7 @@ public Subscription call(final Observer observer) { public void run() { try { observer.onError(new RuntimeException("failure")); - } catch (Exception e) { + } catch (Throwable e) { // without an onError handler it has to just throw on whatever thread invokes it exception.set(e); } diff --git a/rxjava-core/src/main/java/rx/Observer.java b/rxjava-core/src/main/java/rx/Observer.java index 2b8b859f952..c18b3d4fdee 100644 --- a/rxjava-core/src/main/java/rx/Observer.java +++ b/rxjava-core/src/main/java/rx/Observer.java @@ -42,7 +42,7 @@ public interface Observer { * * @param e */ - public void onError(Exception e); + public void onError(Throwable e); /** * Provides the Observer with new data. diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 47d0583cfed..2d826a8fea7 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -415,7 +415,7 @@ private Subscription protectivelyWrapAndSubscribe(Observer o) { * NOTE: This will block even if the Observable is asynchronous. *

* This is similar to {@link #subscribe(Observer)}, but it blocks. Because it blocks it does - * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. + * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods. *

* * @@ -426,7 +426,7 @@ private Subscription protectivelyWrapAndSubscribe(Observer o) { */ public void forEach(final Action1 onNext) { final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference exceptionFromOnError = new AtomicReference(); + final AtomicReference exceptionFromOnError = new AtomicReference(); /** * Wrapping since raw functions provided by the user are being invoked. @@ -440,7 +440,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { /* * If we receive an onError event we set the reference on the outer thread * so we can git it and throw after the latch.await(). @@ -483,7 +483,7 @@ public void onNext(T args) { * NOTE: This will block even if the Observable is asynchronous. *

* This is similar to {@link #subscribe(Observer)}, but it blocks. Because it blocks it does - * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. + * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods. *

* * @@ -1026,7 +1026,7 @@ public void call(String t1) { } }); fail("we expect an exception to be thrown"); - } catch (Exception e) { + } catch (Throwable e) { // do nothing as we expect this } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java index 8084b7835e7..244122a3468 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ b/rxjava-core/src/main/java/rx/operators/OperationAll.java @@ -61,7 +61,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { underlying.onError(e); } @@ -139,7 +139,7 @@ public Boolean call(String s) { @Test @SuppressWarnings("unchecked") public void testError() { - Exception error = new Exception(); + Throwable error = new Throwable(); Observable obs = Observable.error(error); Observer observer = mock(Observer.class); diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index f4fa22ab189..f5e6a692a5c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -379,7 +379,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { creator.stop(); buffers.emitAllBuffers(); observer.onError(e); @@ -901,7 +901,7 @@ public Subscription call(Observer observer) { buffered.subscribe(observer); Mockito.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - Mockito.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + Mockito.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); Mockito.verify(observer, Mockito.times(1)).onCompleted(); } @@ -927,7 +927,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("two", "three", "four")); inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four", "five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.never()).onCompleted(); } @@ -953,7 +953,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three")); inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -979,7 +979,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1011,7 +1011,7 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1040,7 +1040,7 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(201, TimeUnit.MILLISECONDS); inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1091,7 +1091,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("two", "three")); inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } @@ -1133,7 +1133,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four")); inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Exception.class)); + inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); inOrder.verify(observer, Mockito.times(1)).onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index dfcf8bcaf09..089a850e998 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -115,7 +115,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { a.error(e); } @@ -185,7 +185,7 @@ void complete(CombineObserver w) { /** * Receive error for a Observer. Throw the error up the chain and stop processing. */ - void error(Exception e) { + void error(Throwable e) { observer.onError(e); /* tell all observers to unsubscribe since we had an error */ stop(); @@ -226,7 +226,7 @@ void next(CombineObserver w, T arg) { try { R combinedValue = combineLatestFunction.call(argsToCombineLatest); observer.onNext(combinedValue); - } catch(Exception ex) { + } catch(Throwable ex) { observer.onError(ex); } } @@ -433,14 +433,14 @@ public void testAggregatorSimple() { InOrder inOrder = inOrder(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("helloworld"); a.next(r1, "hello "); a.next(r2, "again"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("hello again"); @@ -476,14 +476,14 @@ public void testAggregatorDifferentSizedResultsWithOnComplete() { a.next(r2, "world"); a.complete(r2); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); a.next(r1, "hi"); a.complete(r1); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("hiworld"); } @@ -513,14 +513,14 @@ public void testAggregateMultipleTypes() { a.next(r2, "world"); a.complete(r2); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); a.next(r1, "hi"); a.complete(r1); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("hiworld"); } @@ -552,7 +552,7 @@ public void testAggregate3Types() { a.next(r2, 2); a.next(r3, new int[] { 5, 6, 7 }); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("hello2[5, 6, 7]"); } @@ -583,7 +583,7 @@ public void testAggregatorsWithDifferentSizesAndTiming() { a.next(r1, "three"); a.next(r2, "A"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("threeA"); @@ -599,7 +599,7 @@ public void testAggregatorsWithDifferentSizesAndTiming() { verify(aObserver, times(1)).onNext("fourE"); a.complete(r2); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -627,7 +627,7 @@ public void testAggregatorError() { a.next(r1, "hello"); a.next(r2, "world"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); @@ -635,7 +635,7 @@ public void testAggregatorError() { a.next(r1, "hello"); a.next(r2, "again"); - verify(aObserver, times(1)).onError(any(Exception.class)); + verify(aObserver, times(1)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); // we don't want to be called again after an error verify(aObserver, times(0)).onNext("helloagain"); @@ -665,7 +665,7 @@ public void testAggregatorUnsubscribe() { a.next(r1, "hello"); a.next(r2, "world"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); @@ -673,7 +673,7 @@ public void testAggregatorUnsubscribe() { a.next(r1, "hello"); a.next(r2, "again"); - verify(aObserver, times(0)).onError(any(Exception.class)); + verify(aObserver, times(0)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); // we don't want to be called again after an error verify(aObserver, times(0)).onNext("helloagain"); @@ -707,13 +707,13 @@ public void testAggregatorEarlyCompletion() { InOrder inOrder = inOrder(aObserver); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("twoA"); a.complete(r2); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); // we shouldn't get this since completed is called before any other onNext calls could trigger this inOrder.verify(aObserver, never()).onNext(anyString()); @@ -731,7 +731,7 @@ public void testCombineLatest2Types() { Observable w = Observable.create(combineLatest(Observable.from("one", "two"), Observable.from(2, 3, 4), combineLatestFunction)); w.subscribe(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("two2"); verify(aObserver, times(1)).onNext("two3"); @@ -750,7 +750,7 @@ public void testCombineLatest3TypesA() { Observable w = Observable.create(combineLatest(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), combineLatestFunction)); w.subscribe(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("two2[4, 5, 6]"); } @@ -767,7 +767,7 @@ public void testCombineLatest3TypesB() { Observable w = Observable.create(combineLatest(Observable.from("one"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction)); w.subscribe(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one2[4, 5, 6]"); verify(aObserver, times(1)).onNext("one2[7, 8]"); diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 0a2706cca96..96504c531a4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -89,7 +89,7 @@ public void onNext(T item) { observer.onNext(item); } @Override - public void onError(Exception e) { + public void onError(Throwable e) { if (completedOrErred.compareAndSet(false, true)) { outerSubscription.unsubscribe(); observer.onError(e); @@ -131,7 +131,7 @@ public void onNext(Observable nextSequence) { } } @Override - public void onError(Exception e) { + public void onError(Throwable e) { if (completedOrErred.compareAndSet(false, true)) { if (innerSubscription != null) { innerSubscription.unsubscribe(); @@ -258,7 +258,7 @@ public void testSimpleAsyncConcat() { // wait for async observables to complete o1.t.join(); o2.t.join(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException("failed waiting on threads"); } @@ -276,7 +276,7 @@ public void testSimpleAsyncConcat() { */ @SuppressWarnings("unchecked") @Test - public void testNestedAsyncConcat() throws Exception { + public void testNestedAsyncConcat() throws Throwable { Observer observer = mock(Observer.class); final TestObservable o1 = new TestObservable("one", "two", "three"); @@ -317,7 +317,7 @@ public void run() { observer.onNext(o3); } - } catch (Exception e) { + } catch (Throwable e) { observer.onError(e); } finally { System.out.println("Done parent Observable"); @@ -349,7 +349,7 @@ public void run() { } System.out.println("Thread2 started ... waiting for it to complete ..."); o2.t.join(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException("failed waiting on threads", e); } @@ -366,7 +366,7 @@ public void run() { inOrder.verify(observer, never()).onNext("nine"); // we should not be completed yet verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); // now allow the third allowThird.countDown(); @@ -377,7 +377,7 @@ public void run() { } // wait for 3rd to complete o3.t.join(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException("failed waiting on threads", e); } @@ -386,7 +386,7 @@ public void run() { inOrder.verify(observer, times(1)).onNext("nine"); inOrder.verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); } @SuppressWarnings("unchecked") @@ -407,7 +407,7 @@ public void testBlockedObservableOfObservables() { try { //Block main thread to allow observables to serve up o1. callOnce.await(); - } catch (Exception ex) { + } catch (Throwable ex) { ex.printStackTrace(); fail(ex.getMessage()); } @@ -421,7 +421,7 @@ public void testBlockedObservableOfObservables() { // unblock observables so it can serve up o2 and complete okToContinue.countDown(); observableOfObservables.t.join(); - } catch (Exception ex) { + } catch (Throwable ex) { ex.printStackTrace(); fail(ex.getMessage()); } @@ -463,7 +463,7 @@ public void testConcatConcurrentWithInfinity() { inOrder.verify(aObserver, times(1)).onNext("three"); inOrder.verify(aObserver, times(47)).onNext("hello"); verify(aObserver, times(1)).onCompleted(); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); } @@ -558,7 +558,7 @@ public void testConcatUnsubscribe() { okToContinue.countDown(); w1.t.join(); w2.t.join(); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } @@ -604,7 +604,7 @@ public void testConcatUnsubscribeConcurrent() { okToContinue.countDown(); w1.t.join(); w2.t.join(); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } @@ -617,7 +617,7 @@ public void testConcatUnsubscribeConcurrent() { inOrder.verify(aObserver, never()).onNext("five"); inOrder.verify(aObserver, never()).onNext("six"); verify(aObserver, never()).onCompleted(); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); } private static class TestObservable extends Observable { diff --git a/rxjava-core/src/main/java/rx/operators/OperationDefer.java b/rxjava-core/src/main/java/rx/operators/OperationDefer.java index ab7d05b0198..169227f17c3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDefer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDefer.java @@ -52,7 +52,7 @@ public Subscription call(Observer observer) { public static class UnitTest { @Test @SuppressWarnings("unchecked") - public void testDefer() throws Exception { + public void testDefer() throws Throwable { Func0> factory = mock(Func0.class); diff --git a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java index b3b4d95c3f9..3d5c2d83929 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java @@ -65,7 +65,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { } @Override @@ -75,7 +75,7 @@ public void onNext(Notification value) { observer.onNext(value.getValue()); break; case OnError: - observer.onError(value.getException()); + observer.onError(value.getThrowable()); break; case OnCompleted: observer.onCompleted(); @@ -100,12 +100,27 @@ public void testDematerialize1() { verify(aObserver, times(1)).onNext(1); verify(aObserver, times(1)).onNext(2); verify(aObserver, times(1)).onCompleted(); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); } @Test @SuppressWarnings("unchecked") public void testDematerialize2() { + Throwable exception = new Throwable("test"); + Observable observable = Observable.error(exception); + Observable dematerialize = Observable.create(dematerialize(observable.materialize())); + + Observer aObserver = mock(Observer.class); + dematerialize.subscribe(aObserver); + + verify(aObserver, times(1)).onError(exception); + verify(aObserver, times(0)).onCompleted(); + verify(aObserver, times(0)).onNext(any(Integer.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testDematerialize3() { Exception exception = new Exception("test"); Observable observable = Observable.error(exception); Observable dematerialize = Observable.create(dematerialize(observable.materialize())); diff --git a/rxjava-core/src/main/java/rx/operators/OperationFilter.java b/rxjava-core/src/main/java/rx/operators/OperationFilter.java index 21dddafed7d..1e0b7d6fd9d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFilter.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFilter.java @@ -55,14 +55,14 @@ public void onNext(T value) { if (predicate.call(value)) { observer.onNext(value); } - } catch (Exception ex) { + } catch (Throwable ex) { observer.onError(ex); // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } } - public void onError(Exception ex) { + public void onError(Throwable ex) { observer.onError(ex); } @@ -93,7 +93,7 @@ public Boolean call(String t1) { verify(aObserver, Mockito.never()).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java index 1192e9f3ac9..82fcfd4686f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFinally.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -91,7 +91,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { try { observer.onError(e); } finally { diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index fdfbb1c733d..4a673b6fee6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -95,7 +95,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { // we need to propagate to all children I imagine ... we can't just leave all of those Observable/Observers hanging for (GroupedSubject o : groupedObservables.values()) { o.onError(e); @@ -215,7 +215,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { subscribedObserver.get().onError(e); } @@ -235,7 +235,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { // do nothing } @@ -298,7 +298,7 @@ public void testError() { final AtomicInteger groupCounter = new AtomicInteger(); final AtomicInteger eventCounter = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference(); grouped.mapMany(new Func1, Observable>() { @@ -321,7 +321,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { e.printStackTrace(); error.set(e); } @@ -365,10 +365,10 @@ public void call(V v) { /** * Assert that only a single subscription to a stream occurs and that all events are received. * - * @throws Exception + * @throws Throwable */ @Test - public void testGroupedEventStream() throws Exception { + public void testGroupedEventStream() throws Throwable { final AtomicInteger eventCounter = new AtomicInteger(); final AtomicInteger subscribeCounter = new AtomicInteger(); @@ -432,7 +432,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { e.printStackTrace(); latch.countDown(); } @@ -529,7 +529,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { e.printStackTrace(); latch.countDown(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index a9a46d2bbe6..3b0b00ca157 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -106,7 +106,7 @@ public void testInterval() { verify(observer, never()).onNext(0L); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(2, TimeUnit.SECONDS); @@ -115,13 +115,13 @@ public void testInterval() { inOrder.verify(observer, times(1)).onNext(1L); inOrder.verify(observer, never()).onNext(2L); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); sub.unsubscribe(); scheduler.advanceTimeTo(4, TimeUnit.SECONDS); verify(observer, never()).onNext(2L); verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMap.java b/rxjava-core/src/main/java/rx/operators/OperationMap.java index c6435ae081d..85ec5b38b3c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMap.java @@ -125,7 +125,7 @@ public void onNext(T value) { observer.onNext(func.call(value)); } - public void onError(Exception ex) { + public void onError(Throwable ex) { observer.onError(ex); } @@ -160,7 +160,7 @@ public String call(Map map) { })); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onNext("OneFirst"); verify(stringObserver, times(1)).onNext("TwoFirst"); verify(stringObserver, times(1)).onCompleted(); @@ -202,7 +202,7 @@ public String call(Map map) { })); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onNext("OneFirst"); verify(stringObserver, times(1)).onNext("TwoFirst"); verify(stringObserver, times(1)).onNext("ThreeFirst"); @@ -241,7 +241,7 @@ public String call(Map map) { })); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onNext("OneFirst"); verify(stringObserver, times(1)).onNext("TwoFirst"); verify(stringObserver, times(1)).onNext("ThreeFirst"); @@ -277,7 +277,7 @@ public String call(String s) { verify(stringObserver, never()).onNext("two"); verify(stringObserver, never()).onNext("three"); verify(stringObserver, never()).onCompleted(); - verify(stringObserver, times(1)).onError(any(Exception.class)); + verify(stringObserver, times(1)).onError(any(Throwable.class)); // we should have only returned 1 value: "one" assertEquals(1, c1.get()); diff --git a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java index 279f4202d08..1e22154e146 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java @@ -70,7 +70,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { observer.onNext(new Notification(e)); observer.onCompleted(); } @@ -108,7 +108,7 @@ public void testMaterialize1() { assertTrue(Observer.notifications.get(0).isOnNext()); assertEquals("two", Observer.notifications.get(1).getValue()); assertTrue(Observer.notifications.get(1).isOnNext()); - assertEquals(NullPointerException.class, Observer.notifications.get(2).getException().getClass()); + assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable().getClass()); assertTrue(Observer.notifications.get(2).isOnError()); } @@ -174,7 +174,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { this.onError = true; } @@ -206,7 +206,7 @@ public void run() { System.out.println("throwing exception"); try { Thread.sleep(100); - } catch (Exception e) { + } catch (Throwable e) { } observer.onError(new NullPointerException()); diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 2710141603d..c183fe957eb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -202,7 +202,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { actualObserver.onError(e); } @@ -260,7 +260,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { if (!stopped.get()) { if (ourSubscription.stop()) { // this thread 'won' the race to unsubscribe/stop so let's send the error @@ -318,7 +318,7 @@ public void unsubscribe() { Observable m = Observable.create(merge(observableOfObservables)); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onCompleted(); verify(stringObserver, times(2)).onNext("hello"); } @@ -332,7 +332,7 @@ public void testMergeArray() { Observable m = Observable.create(merge(o1, o2)); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(2)).onNext("hello"); verify(stringObserver, times(1)).onCompleted(); } @@ -348,7 +348,7 @@ public void testMergeList() { Observable m = Observable.create(merge(listOfObservables)); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onCompleted(); verify(stringObserver, times(2)).onNext("hello"); } @@ -370,7 +370,7 @@ public void testUnSubscribe() { tA.sendOnCompleted(); tB.sendOnCompleted(); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onNext("Aone"); verify(stringObserver, times(1)).onNext("Bone"); assertTrue(tA.unsubscribed); @@ -396,13 +396,13 @@ public void testMergeArrayWithThreading() { throw new RuntimeException(e); } - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(2)).onNext("hello"); verify(stringObserver, times(1)).onCompleted(); } @Test - public void testSynchronizationOfMultipleSequences() throws Exception { + public void testSynchronizationOfMultipleSequences() throws Throwable { final TestASynchronousObservable o1 = new TestASynchronousObservable(); final TestASynchronousObservable o2 = new TestASynchronousObservable(); @@ -422,7 +422,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { throw new RuntimeException("failed", e); } @@ -604,7 +604,7 @@ public void sendOnNext(String value) { /* used to simulate subscription */ @SuppressWarnings("unused") - public void sendOnError(Exception e) { + public void sendOnError(Throwable e) { observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java index b7e7eb05981..a54f193a2f0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java @@ -154,7 +154,7 @@ private static final class MergeDelayErrorObservable implements Func1 childObservers = new ConcurrentHashMap(); private final ConcurrentHashMap childSubscriptions = new ConcurrentHashMap(); // onErrors we received that will be delayed until everything is completed and then sent - private ConcurrentLinkedQueue onErrorReceived = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue onErrorReceived = new ConcurrentLinkedQueue(); private MergeDelayErrorObservable(Observable> sequences) { this.sequences = sequences; @@ -236,7 +236,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { actualObserver.onError(e); } @@ -290,7 +290,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { if (!stopped.get()) { onErrorReceived.add(e); // mark this ChildObserver as done @@ -548,7 +548,7 @@ public void unsubscribe() { Observable m = Observable.create(mergeDelayError(observableOfObservables)); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onCompleted(); verify(stringObserver, times(2)).onNext("hello"); } @@ -562,7 +562,7 @@ public void testMergeArray() { Observable m = Observable.create(mergeDelayError(o1, o2)); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(2)).onNext("hello"); verify(stringObserver, times(1)).onCompleted(); } @@ -578,7 +578,7 @@ public void testMergeList() { Observable m = Observable.create(mergeDelayError(listOfObservables)); m.subscribe(stringObserver); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onCompleted(); verify(stringObserver, times(2)).onNext("hello"); } @@ -600,7 +600,7 @@ public void testUnSubscribe() { tA.sendOnCompleted(); tB.sendOnCompleted(); - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(1)).onNext("Aone"); verify(stringObserver, times(1)).onNext("Bone"); assertTrue(tA.unsubscribed); @@ -626,7 +626,7 @@ public void testMergeArrayWithThreading() { throw new RuntimeException(e); } - verify(stringObserver, never()).onError(any(Exception.class)); + verify(stringObserver, never()).onError(any(Throwable.class)); verify(stringObserver, times(2)).onNext("hello"); verify(stringObserver, times(1)).onCompleted(); } @@ -706,7 +706,7 @@ public void sendOnNext(String value) { /* used to simulate subscription */ @SuppressWarnings("unused") - public void sendOnError(Exception e) { + public void sendOnError(Throwable e) { observer.onError(e); } @@ -775,7 +775,7 @@ public void run() { System.out.println("throwing exception"); try { Thread.sleep(100); - } catch (Exception e) { + } catch (Throwable e) { } observer.onError(new NullPointerException()); @@ -803,7 +803,7 @@ public void unsubscribe() { } private static class CaptureObserver implements Observer { - volatile Exception e; + volatile Throwable e; @Override public void onCompleted() { @@ -811,7 +811,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { this.e = e; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java index 4a14fe325e3..ce7e3dbbcc6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java @@ -68,8 +68,8 @@ public boolean hasNext() { @Override public T next() { - if (observer.getException() != null) { - throw Exceptions.propagate(observer.getException()); + if (observer.getThrowable() != null) { + throw Exceptions.propagate(observer.getThrowable()); } return observer.getRecentValue(); } @@ -83,7 +83,7 @@ public void remove() { private static class MostRecentObserver implements Observer { private final AtomicBoolean completed = new AtomicBoolean(false); private final AtomicReference value; - private final AtomicReference exception = new AtomicReference(); + private final AtomicReference exception = new AtomicReference(); private MostRecentObserver(T value) { this.value = new AtomicReference(value); @@ -95,7 +95,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { exception.set(e); } @@ -104,15 +104,15 @@ public void onNext(T args) { value.set(args); } - public boolean isCompleted() { + private boolean isCompleted() { return completed.get(); } - public Exception getException() { + private Throwable getThrowable() { return exception.get(); } - public T getRecentValue() { + private T getRecentValue() { return value.get(); } @@ -182,7 +182,7 @@ public void sendOnNext(String value) { } /* used to simulate subscription */ - public void sendOnError(Exception e) { + public void sendOnError(Throwable e) { observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index ad312fa1dd5..8a7db21dda8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -60,7 +60,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { subject.onError(e); } @@ -185,7 +185,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { // Do nothing } @@ -204,7 +204,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { // Do nothing } diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index ad721913822..587141b1eb3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -109,7 +109,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { // ignore } @@ -142,7 +142,7 @@ public boolean isCompleted(boolean rethrowExceptionIfExists) { if (lastItem.isOnError()) { if (rethrowExceptionIfExists) { - throw Exceptions.propagate(lastItem.getException()); + throw Exceptions.propagate(lastItem.getThrowable()); } else { return true; } @@ -155,7 +155,7 @@ public T takeNext() throws InterruptedException { Notification next = buf.take(); if (next.isOnError()) { - throw Exceptions.propagate(next.getException()); + throw Exceptions.propagate(next.getThrowable()); } if (next.isOnCompleted()) { @@ -172,7 +172,7 @@ public static class UnitTest { private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Test - public void testNext() throws Exception { + public void testNext() throws Throwable { Subscription s = mock(Subscription.class); final TestObservable obs = new TestObservable(s); @@ -249,13 +249,13 @@ public void testOnErrorViaHasNext() throws Throwable { // this should not throw an exception but instead just return false try { assertFalse(it.hasNext()); - } catch (Exception e) { + } catch (Throwable e) { fail("should not have received exception"); e.printStackTrace(); } } - private Future nextAsync(final Iterator it) throws Exception { + private Future nextAsync(final Iterator it) throws Throwable { return executor.submit(new Callable() { @@ -286,7 +286,7 @@ public void sendOnNext(String value) { } /* used to simulate subscription */ - public void sendOnError(Exception e) { + public void sendOnError(Throwable e) { observer.onError(e); } @@ -308,10 +308,10 @@ private static class TestException extends RuntimeException { * * This results in output such as => a: 1 b: 2 c: 89 * - * @throws Exception + * @throws Throwable */ @Test - public void testNoBufferingOrBlockingOfSequence() throws Exception { + public void testNoBufferingOrBlockingOfSequence() throws Throwable { final CountDownLatch finished = new CountDownLatch(1); final AtomicBoolean running = new AtomicBoolean(true); final AtomicInteger count = new AtomicInteger(0); @@ -329,7 +329,7 @@ public void run() { Thread.sleep(0, 100); } o.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { o.onError(e); } finally { finished.countDown(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java index c7ae0c3a3e6..7110804dd97 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java @@ -53,16 +53,16 @@ */ public final class OperationOnErrorResumeNextViaFunction { - public static Func1, Subscription> onErrorResumeNextViaFunction(Observable originalSequence, Func1> resumeFunction) { + public static Func1, Subscription> onErrorResumeNextViaFunction(Observable originalSequence, Func1> resumeFunction) { return new OnErrorResumeNextViaFunction(originalSequence, resumeFunction); } private static class OnErrorResumeNextViaFunction implements Func1, Subscription> { - private final Func1> resumeFunction; + private final Func1> resumeFunction; private final Observable originalSequence; - public OnErrorResumeNextViaFunction(Observable originalSequence, Func1> resumeFunction) { + public OnErrorResumeNextViaFunction(Observable originalSequence, Func1> resumeFunction) { this.resumeFunction = resumeFunction; this.originalSequence = originalSequence; } @@ -81,7 +81,7 @@ public void onNext(T value) { /** * Instead of passing the onError forward, we intercept and "resume" with the resumeSequence. */ - public void onError(Exception ex) { + public void onError(Throwable ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ SafeObservableSubscription currentSubscription = subscriptionRef.get(); // check that we have not been unsubscribed before we can process the error @@ -96,7 +96,7 @@ public void onError(Exception ex) { // so we want to immediately unsubscribe from the resumeSequence we just subscribed to innerSubscription.unsubscribe(); } - } catch (Exception e) { + } catch (Throwable e) { // the resume function failed so we need to call onError // I am using CompositeException so that both exceptions can be seen observer.onError(new CompositeException("OnErrorResume function failed", Arrays.asList(ex, e))); @@ -126,21 +126,21 @@ public static class UnitTest { @Test public void testResumeNextWithSynchronousExecution() { - final AtomicReference receivedException = new AtomicReference(); + final AtomicReference receivedException = new AtomicReference(); Observable w = Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { observer.onNext("one"); - observer.onError(new Exception("injected failure")); + observer.onError(new Throwable("injected failure")); return Subscriptions.empty(); } }); - Func1> resume = new Func1>() { + Func1> resume = new Func1>() { @Override - public Observable call(Exception t1) { + public Observable call(Throwable t1) { receivedException.set(t1); return Observable.from("twoResume", "threeResume"); } @@ -152,7 +152,7 @@ public Observable call(Exception t1) { Observer aObserver = mock(Observer.class); observable.subscribe(aObserver); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one"); verify(aObserver, Mockito.never()).onNext("two"); @@ -164,13 +164,13 @@ public Observable call(Exception t1) { @Test public void testResumeNextWithAsyncExecution() { - final AtomicReference receivedException = new AtomicReference(); + final AtomicReference receivedException = new AtomicReference(); Subscription s = mock(Subscription.class); TestObservable w = new TestObservable(s, "one"); - Func1> resume = new Func1>() { + Func1> resume = new Func1>() { @Override - public Observable call(Exception t1) { + public Observable call(Throwable t1) { receivedException.set(t1); return Observable.from("twoResume", "threeResume"); } @@ -188,7 +188,7 @@ public Observable call(Exception t1) { fail(e.getMessage()); } - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one"); verify(aObserver, Mockito.never()).onNext("two"); @@ -205,10 +205,10 @@ public Observable call(Exception t1) { public void testFunctionThrowsError() { Subscription s = mock(Subscription.class); TestObservable w = new TestObservable(s, "one"); - Func1> resume = new Func1>() { + Func1> resume = new Func1>() { @Override - public Observable call(Exception t1) { + public Observable call(Throwable t1) { throw new RuntimeException("exception from function"); } @@ -229,7 +229,7 @@ public Observable call(Exception t1) { verify(aObserver, times(1)).onNext("one"); // we should have received an onError call on the Observer since the resume function threw an exception - verify(aObserver, times(1)).onError(any(Exception.class)); + verify(aObserver, times(1)).onError(any(Throwable.class)); verify(aObserver, times(0)).onCompleted(); } @@ -258,7 +258,7 @@ public void run() { observer.onNext(s); } throw new RuntimeException("Forced Failure"); - } catch (Exception e) { + } catch (Throwable e) { observer.onError(e); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java index e9e460e8f04..4ea879bdb9d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java @@ -81,7 +81,7 @@ public void onNext(T value) { /** * Instead of passing the onError forward, we intercept and "resume" with the resumeSequence. */ - public void onError(Exception ex) { + public void onError(Throwable ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ SafeObservableSubscription currentSubscription = subscriptionRef.get(); // check that we have not been unsubscribed and not already resumed before we can process the error @@ -136,7 +136,7 @@ public void testResumeNext() { fail(e.getMessage()); } - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one"); verify(aObserver, Mockito.never()).onNext("two"); @@ -176,7 +176,7 @@ public String call(String s) { fail(e.getMessage()); } - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one"); verify(aObserver, Mockito.never()).onNext("two"); @@ -213,7 +213,7 @@ public void run() { } System.out.println("TestObservable onCompleted"); observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { System.out.println("TestObservable onError: " + e); observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java index ea1c9601bba..81a242ec25f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java @@ -50,15 +50,15 @@ */ public final class OperationOnErrorReturn { - public static Func1, Subscription> onErrorReturn(Observable originalSequence, Func1 resumeFunction) { + public static Func1, Subscription> onErrorReturn(Observable originalSequence, Func1 resumeFunction) { return new OnErrorReturn(originalSequence, resumeFunction); } private static class OnErrorReturn implements Func1, Subscription> { - private final Func1 resumeFunction; + private final Func1 resumeFunction; private final Observable originalSequence; - public OnErrorReturn(Observable originalSequence, Func1 resumeFunction) { + public OnErrorReturn(Observable originalSequence, Func1 resumeFunction) { this.resumeFunction = resumeFunction; this.originalSequence = originalSequence; } @@ -79,7 +79,7 @@ public void onNext(T value) { /** * Instead of passing the onError forward, we intercept and "resume" with the resumeSequence. */ - public void onError(Exception ex) { + public void onError(Throwable ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ SafeObservableSubscription currentSubscription = subscriptionRef.get(); // check that we have not been unsubscribed before we can process the error @@ -99,7 +99,7 @@ public void onError(Exception ex) { /* unsubscribe since it blew up */ currentSubscription.unsubscribe(); - } catch (Exception e) { + } catch (Throwable e) { // the return function failed so we need to call onError // I am using CompositeException so that both exceptions can be seen observer.onError(new CompositeException("OnErrorReturn function failed", Arrays.asList(ex, e))); @@ -131,12 +131,12 @@ public static class UnitTest { public void testResumeNext() { Subscription s = mock(Subscription.class); TestObservable w = new TestObservable(s, "one"); - final AtomicReference capturedException = new AtomicReference(); + final AtomicReference capturedException = new AtomicReference(); - Observable observable = Observable.create(onErrorReturn(w, new Func1() { + Observable observable = Observable.create(onErrorReturn(w, new Func1() { @Override - public String call(Exception e) { + public String call(Throwable e) { capturedException.set(e); return "failure"; } @@ -153,7 +153,7 @@ public String call(Exception e) { fail(e.getMessage()); } - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("failure"); @@ -167,12 +167,12 @@ public String call(Exception e) { public void testFunctionThrowsError() { Subscription s = mock(Subscription.class); TestObservable w = new TestObservable(s, "one"); - final AtomicReference capturedException = new AtomicReference(); + final AtomicReference capturedException = new AtomicReference(); - Observable observable = Observable.create(onErrorReturn(w, new Func1() { + Observable observable = Observable.create(onErrorReturn(w, new Func1() { @Override - public String call(Exception e) { + public String call(Throwable e) { capturedException.set(e); throw new RuntimeException("exception from function"); } @@ -193,7 +193,7 @@ public String call(Exception e) { verify(aObserver, times(1)).onNext("one"); // we should have received an onError call on the Observer since the resume function threw an exception - verify(aObserver, times(1)).onError(any(Exception.class)); + verify(aObserver, times(1)).onError(any(Throwable.class)); verify(aObserver, times(0)).onCompleted(); assertNotNull(capturedException.get()); } @@ -223,7 +223,7 @@ public void run() { observer.onNext(s); } throw new RuntimeException("Forced Failure"); - } catch (Exception e) { + } catch (Throwable e) { observer.onError(e); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index 74482c6bfb0..b389fed150a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -83,7 +83,7 @@ public Subscription call(final Observer observer) { public void onCompleted() { /* the clock never completes */ } @Override - public void onError(Exception e) { /* the clock has no errors */ } + public void onError(Throwable e) { /* the clock has no errors */ } @Override public void onNext(Long tick) { @@ -101,7 +101,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { clockSubscription.unsubscribe(); observer.onError(e); } @@ -170,31 +170,31 @@ public void call() { scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS); verify(observer, never()).onNext(any(Long.class)); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext(1L); verify(observer, never()).onNext(2L); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(1600L, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext(1L); verify(observer, never()).onNext(2L); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(1L); inOrder.verify(observer, times(1)).onNext(2L); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(3000L, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(1L); inOrder.verify(observer, times(2)).onNext(2L); verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index 8a23f606dea..24cb6ea66ff 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -101,7 +101,7 @@ public synchronized void onNext(T value) { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { observer.onError(e); } @@ -156,13 +156,13 @@ public synchronized void onNext(T value) { try { acc = accumulatorFunction.call(acc, value); observer.onNext(acc); - } catch (Exception ex) { + } catch (Throwable ex) { observer.onError(ex); } } @Override - public void onError(Exception e) { + public void onError(Throwable e) { observer.onError(e); } @@ -196,14 +196,14 @@ public String call(String s, Integer n) { })); m.subscribe(observer); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(""); verify(observer, times(1)).onNext("1"); verify(observer, times(1)).onNext("12"); verify(observer, times(1)).onNext("123"); verify(observer, times(4)).onNext(anyString()); verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); } @Test @@ -223,14 +223,14 @@ public Integer call(Integer t1, Integer t2) { })); m.subscribe(Observer); - verify(Observer, never()).onError(any(Exception.class)); + verify(Observer, never()).onError(any(Throwable.class)); verify(Observer, never()).onNext(0); verify(Observer, times(1)).onNext(1); verify(Observer, times(1)).onNext(3); verify(Observer, times(1)).onNext(6); verify(Observer, times(3)).onNext(anyInt()); verify(Observer, times(1)).onCompleted(); - verify(Observer, never()).onError(any(Exception.class)); + verify(Observer, never()).onError(any(Throwable.class)); } @Test @@ -250,12 +250,12 @@ public Integer call(Integer t1, Integer t2) { })); m.subscribe(Observer); - verify(Observer, never()).onError(any(Exception.class)); + verify(Observer, never()).onError(any(Throwable.class)); verify(Observer, never()).onNext(0); verify(Observer, times(1)).onNext(1); verify(Observer, times(1)).onNext(anyInt()); verify(Observer, times(1)).onCompleted(); - verify(Observer, never()).onError(any(Exception.class)); + verify(Observer, never()).onError(any(Throwable.class)); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkip.java b/rxjava-core/src/main/java/rx/operators/OperationSkip.java index c3763e8d8a3..c238b2d2aeb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkip.java @@ -97,7 +97,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { observer.onError(e); } @@ -126,7 +126,7 @@ public void testSkip1() { verify(aObserver, never()).onNext("one"); verify(aObserver, never()).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -141,7 +141,7 @@ public void testSkip2() { verify(aObserver, never()).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index 3e2e3fdfac9..9de1b0be728 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -93,7 +93,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { unsubscribeFromSubSequence(); observer.onError(e); } @@ -109,7 +109,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { parent.unsubscribe(); observer.onError(e); } @@ -178,27 +178,27 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(175, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("two"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(225, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); } @Test @@ -238,22 +238,22 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(175, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("two"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(225, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); @@ -303,17 +303,17 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); } @Test @@ -358,12 +358,12 @@ public Subscription call(Observer observer) { scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, never()).onError(any(Throwable.class)); scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext("three"); @@ -380,7 +380,7 @@ public void call() { }, delay, TimeUnit.MILLISECONDS); } - private void publishError(final Observer observer, long delay, final Exception error) { + private void publishError(final Observer observer, long delay, final Throwable error) { scheduler.schedule(new Action0() { @Override public void call() { @@ -399,7 +399,7 @@ public void call() { } @SuppressWarnings("serial") - private class TestException extends Exception { + private class TestException extends Throwable { } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java index b6998eaa1d2..b1820630387 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java @@ -137,7 +137,7 @@ public void testOnErrorAfterUnSubscribe() { t.sendOnError(new RuntimeException("bad")); verify(w, times(1)).onNext("one"); - verify(w, Mockito.never()).onError(any(Exception.class)); + verify(w, Mockito.never()).onError(any(Throwable.class)); } /** @@ -158,7 +158,7 @@ public void testOnNextAfterOnError() { t.sendOnNext("two"); verify(w, times(1)).onNext("one"); - verify(w, times(1)).onError(any(Exception.class)); + verify(w, times(1)).onError(any(Throwable.class)); verify(w, Mockito.never()).onNext("two"); } @@ -180,7 +180,7 @@ public void testOnCompletedAfterOnError() { t.sendOnCompleted(); verify(w, times(1)).onNext("one"); - verify(w, times(1)).onError(any(Exception.class)); + verify(w, times(1)).onError(any(Throwable.class)); verify(w, Mockito.never()).onCompleted(); } @@ -204,7 +204,7 @@ public void testOnNextAfterOnCompleted() { verify(w, times(1)).onNext("one"); verify(w, Mockito.never()).onNext("two"); verify(w, times(1)).onCompleted(); - verify(w, Mockito.never()).onError(any(Exception.class)); + verify(w, Mockito.never()).onError(any(Throwable.class)); } /** @@ -226,7 +226,7 @@ public void testOnErrorAfterOnCompleted() { verify(w, times(1)).onNext("one"); verify(w, times(1)).onCompleted(); - verify(w, Mockito.never()).onError(any(Exception.class)); + verify(w, Mockito.never()).onError(any(Throwable.class)); } /** @@ -250,7 +250,7 @@ public void sendOnNext(String value) { } /* used to simulate subscription */ - public void sendOnError(Exception e) { + public void sendOnError(Throwable e) { observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index baefcbf8f31..c88001f1d14 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -95,7 +95,7 @@ public void onCompleted() } @Override - public void onError(Exception e) + public void onError(Throwable e) { } @@ -128,7 +128,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { if (counter.getAndSet(num) < num) { observer.onError(e); } @@ -166,7 +166,7 @@ public void testTake1() { verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -181,7 +181,7 @@ public void testTake2() { verify(aObserver, times(1)).onNext("one"); verify(aObserver, never()).onNext("two"); verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -193,7 +193,7 @@ public void testTakeDoesntLeakErrors() { public Subscription call(Observer observer) { observer.onNext("one"); - observer.onError(new Exception("test failed")); + observer.onError(new Throwable("test failed")); return Subscriptions.empty(); } }); @@ -220,7 +220,7 @@ public void testTakeZeroDoesntLeakError() { public Subscription call(Observer observer) { subscribed.set(true); - observer.onError(new Exception("test failed")); + observer.onError(new Throwable("test failed")); return new Subscription() { @Override @@ -259,7 +259,7 @@ public void testUnsubscribeAfterTake() { // wait for the Observable to complete try { w.t.join(); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } @@ -298,7 +298,7 @@ public void run() { observer.onNext(s); } observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException(e); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 569d4cc603f..2abe8381b72 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -81,7 +81,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { observer.onError(e); } @@ -107,7 +107,7 @@ public void testTakeLastEmpty() { Observer aObserver = mock(Observer.class); take.subscribe(aObserver); verify(aObserver, never()).onNext(any(String.class)); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -123,7 +123,7 @@ public void testTakeLast1() { inOrder.verify(aObserver, times(1)).onNext("two"); inOrder.verify(aObserver, times(1)).onNext("three"); verify(aObserver, never()).onNext("one"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -136,7 +136,7 @@ public void testTakeLast2() { Observer aObserver = mock(Observer.class); take.subscribe(aObserver); verify(aObserver, times(1)).onNext("one"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java b/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java index 6c5ba775106..fb33a086425 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java @@ -105,7 +105,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { notificationObserver.onError(e); } @@ -133,7 +133,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { notificationObserver.onError(e); } @@ -203,7 +203,7 @@ public void testTakeUntilSourceError() { Subscription sOther = mock(Subscription.class); TestObservable source = new TestObservable(sSource); TestObservable other = new TestObservable(sOther); - Exception error = new Exception(); + Throwable error = new Throwable(); Observer result = mock(Observer.class); Observable stringObservable = takeUntil(source, other); @@ -227,7 +227,7 @@ public void testTakeUntilOtherError() { Subscription sOther = mock(Subscription.class); TestObservable source = new TestObservable(sSource); TestObservable other = new TestObservable(sOther); - Exception error = new Exception(); + Throwable error = new Throwable(); Observer result = mock(Observer.class); Observable stringObservable = takeUntil(source, other); @@ -288,7 +288,7 @@ public void sendOnNext(String value) { } /* used to simulate subscription */ - public void sendOnError(Exception e) { + public void sendOnError(Throwable e) { observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index 273e6cac67a..12038e72b09 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -125,7 +125,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { observer.onError(e); } @@ -134,7 +134,7 @@ public void onNext(T args) { Boolean isSelected; try { isSelected = predicate.call(args, counter.getAndIncrement()); - } catch (Exception e) { + } catch (Throwable e) { observer.onError(e); return; } @@ -171,7 +171,7 @@ public Boolean call(Integer input) verify(aObserver, times(1)).onNext(1); verify(aObserver, times(1)).onNext(2); verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -203,7 +203,7 @@ public Boolean call(Integer input) verify(aObserver, never()).onNext(3); verify(aObserver, never()).onNext(4); verify(aObserver, never()).onNext(5); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -225,7 +225,7 @@ public Boolean call(String input, Integer index) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -237,7 +237,7 @@ public void testTakeWhileDoesntLeakErrors() { public Subscription call(Observer observer) { observer.onNext("one"); - observer.onError(new Exception("test failed")); + observer.onError(new Throwable("test failed")); return Subscriptions.empty(); } }); @@ -272,7 +272,7 @@ public Boolean call(String s) // wait for the Observable to complete try { source.t.join(); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } @@ -301,7 +301,7 @@ public Boolean call(String s, Integer index) // wait for the Observable to complete try { w.t.join(); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } @@ -338,7 +338,7 @@ public void run() { observer.onNext(s); } observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException(e); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationToFuture.java b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java index 6a024a0d6d2..5f9b86d7cf3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToFuture.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java @@ -41,7 +41,7 @@ public static Future toFuture(Observable that) { final CountDownLatch finished = new CountDownLatch(1); final AtomicReference value = new AtomicReference(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference(); final Subscription s = that.subscribe(new Observer() { @@ -51,7 +51,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { error.compareAndSet(null, e); finished.countDown(); } @@ -161,7 +161,7 @@ public Subscription call(Observer observer) { try { f.get(); fail("expected exception"); - } catch (Exception e) { + } catch (Throwable e) { assertEquals(TestException.class, e.getCause().getClass()); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java index a5817ac2369..f3c1b210679 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java @@ -59,7 +59,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { // ignore } @@ -86,7 +86,7 @@ public T next() { buf = take(); } if (buf.isOnError()) { - throw Exceptions.propagate(buf.getException()); + throw Exceptions.propagate(buf.getThrowable()); } T result = buf.getValue(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java index ae40d781038..e7ee406ceab 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java @@ -50,7 +50,7 @@ public Subscription call(Observer observer) { observer.onNext(value); } observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java index d4179fde33a..0976f9113c5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java @@ -72,7 +72,7 @@ public void testIterable() { verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java index 49cbc5d943f..7258938d116 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java @@ -68,7 +68,7 @@ public void onNext(T value) { list.add(value); } - public void onError(Exception ex) { + public void onError(Throwable ex) { observer.onError(ex); } @@ -85,7 +85,7 @@ public void onCompleted() { // observer.onNext(Collections.unmodifiableList(l)); observer.onNext(l); observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { onError(e); } @@ -105,7 +105,7 @@ public void testList() { Observer> aObserver = mock(Observer.class); observable.subscribe(aObserver); verify(aObserver, times(1)).onNext(Arrays.asList("one", "two", "three")); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -125,11 +125,11 @@ public void testListMultipleObservers() { List expected = Arrays.asList("one", "two", "three"); verify(o1, times(1)).onNext(expected); - verify(o1, Mockito.never()).onError(any(Exception.class)); + verify(o1, Mockito.never()).onError(any(Throwable.class)); verify(o1, times(1)).onCompleted(); verify(o2, times(1)).onNext(expected); - verify(o2, Mockito.never()).onError(any(Exception.class)); + verify(o2, Mockito.never()).onError(any(Throwable.class)); verify(o2, times(1)).onCompleted(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java index 831ddd97250..536def4cd22 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java @@ -92,7 +92,7 @@ public void onNext(T value) { list.add(value); } - public void onError(Exception ex) { + public void onError(Throwable ex) { observer.onError(ex); } @@ -116,7 +116,7 @@ public int compare(T o1, T o2) { observer.onNext(Collections.unmodifiableList(l)); observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { onError(e); } @@ -154,7 +154,7 @@ public void testSortedList() { Observer> aObserver = mock(Observer.class); observable.subscribe(aObserver); verify(aObserver, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5)); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -174,7 +174,7 @@ public Integer call(Integer t1, Integer t2) { Observer> aObserver = mock(Observer.class); observable.subscribe(aObserver); verify(aObserver, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1)); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationWhere.java b/rxjava-core/src/main/java/rx/operators/OperationWhere.java index e667e86d46e..d82b6d829b8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWhere.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWhere.java @@ -58,7 +58,7 @@ public Boolean call(String t1) { verify(aObserver, Mockito.never()).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index a8989d4d90e..92d987ffa7d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -115,7 +115,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { a.error(this, e); } @@ -123,7 +123,7 @@ public void onError(Exception e) { public void onNext(T args) { try { a.next(this, args); - } catch (Exception e) { + } catch (Throwable e) { onError(e); } } @@ -189,7 +189,7 @@ void complete(ZipObserver w) { * * @param w */ - void error(ZipObserver w, Exception e) { + void error(ZipObserver w, Throwable e) { if (running.compareAndSet(true, false)) { // this thread succeeded in setting running=false so let's propagate the error observer.onError(e); @@ -308,7 +308,7 @@ public void testCollectionSizeDifferentThanFunction() { Observable w = Observable.create(zip(ws, zipr)); w.subscribe(aObserver); - verify(aObserver, times(1)).onError(any(Exception.class)); + verify(aObserver, times(1)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, never()).onNext(any(String.class)); } @@ -412,14 +412,14 @@ public void testAggregatorSimple() { InOrder inOrder = inOrder(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("helloworld"); a.next(r1, "hello "); a.next(r2, "again"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("hello again"); @@ -457,14 +457,14 @@ public void testAggregatorDifferentSizedResultsWithOnComplete() { InOrder inOrder = inOrder(aObserver); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("helloworld"); a.next(r1, "hi"); a.complete(r1); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verify(aObserver, never()).onNext(anyString()); } @@ -496,14 +496,14 @@ public void testAggregateMultipleTypes() { InOrder inOrder = inOrder(aObserver); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("helloworld"); a.next(r1, "hi"); a.complete(r1); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verify(aObserver, never()).onNext(anyString()); } @@ -535,7 +535,7 @@ public void testAggregate3Types() { a.next(r2, 2); a.next(r3, new int[] { 5, 6, 7 }); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("hello2[5, 6, 7]"); } @@ -566,7 +566,7 @@ public void testAggregatorsWithDifferentSizesAndTiming() { a.next(r1, "three"); a.next(r2, "A"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("oneA"); @@ -582,7 +582,7 @@ public void testAggregatorsWithDifferentSizesAndTiming() { verify(aObserver, never()).onNext("E"); a.complete(r2); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -610,7 +610,7 @@ public void testAggregatorError() { a.next(r1, "hello"); a.next(r2, "world"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); @@ -618,7 +618,7 @@ public void testAggregatorError() { a.next(r1, "hello"); a.next(r2, "again"); - verify(aObserver, times(1)).onError(any(Exception.class)); + verify(aObserver, times(1)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); // we don't want to be called again after an error verify(aObserver, times(0)).onNext("helloagain"); @@ -648,7 +648,7 @@ public void testAggregatorUnsubscribe() { a.next(r1, "hello"); a.next(r2, "world"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); @@ -656,7 +656,7 @@ public void testAggregatorUnsubscribe() { a.next(r1, "hello"); a.next(r2, "again"); - verify(aObserver, times(0)).onError(any(Exception.class)); + verify(aObserver, times(0)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); // we don't want to be called again after an error verify(aObserver, times(0)).onNext("helloagain"); @@ -690,13 +690,13 @@ public void testAggregatorEarlyCompletion() { InOrder inOrder = inOrder(aObserver); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("oneA"); a.complete(r2); - inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verify(aObserver, never()).onNext(anyString()); } @@ -713,7 +713,7 @@ public void testZip2Types() { Observable w = Observable.create(zip(Observable.from("one", "two"), Observable.from(2, 3, 4), zipr)); w.subscribe(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one2"); verify(aObserver, times(1)).onNext("two3"); @@ -732,7 +732,7 @@ public void testZip3Types() { Observable w = Observable.create(zip(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), zipr)); w.subscribe(aObserver); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); verify(aObserver, times(1)).onNext("one2[4, 5, 6]"); verify(aObserver, never()).onNext("two"); @@ -748,7 +748,7 @@ public void testOnNextExceptionInvokesOnError() { Observable w = Observable.create(zip(Observable.from(10, 20, 30), Observable.from(0, 1, 2), zipr)); w.subscribe(aObserver); - verify(aObserver, times(1)).onError(any(Exception.class)); + verify(aObserver, times(1)).onError(any(Throwable.class)); } private Func2 getDivideZipr() { diff --git a/rxjava-core/src/main/java/rx/operators/SafeObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java index 2d6055a213b..f9a2553d4d9 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -56,7 +56,7 @@ public void onCompleted() { if (isFinished.compareAndSet(false, true)) { try { actual.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { // handle errors if the onCompleted implementation fails, not just if the Observable fails onError(e); } @@ -66,11 +66,11 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { if (isFinished.compareAndSet(false, true)) { try { actual.onError(e); - } catch (Exception e2) { + } catch (Throwable e2) { if (e2 instanceof OnErrorNotImplementedException) { /** * onError isn't implemented so throw @@ -105,7 +105,7 @@ public void onNext(T args) { if (!isFinished.get()) { actual.onNext(args); } - } catch (Exception e) { + } catch (Throwable e) { // handle errors if the onNext implementation fails, not just if the Observable fails onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index 86c1fecb2e3..205dfb0ca7f 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -41,7 +41,7 @@ public void onCompleted() { } @Override - public void onError(final Exception e) { + public void onError(final Throwable e) { enqueue(new Notification(e)); } @@ -73,7 +73,7 @@ public void call() { underlying.onNext(not.getValue()); break; case OnError: - underlying.onError(not.getException()); + underlying.onError(not.getThrowable()); break; case OnCompleted: underlying.onCompleted(); diff --git a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java index b03073acb98..37b36f2c273 100644 --- a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java @@ -89,7 +89,7 @@ public void onNext(T arg) { } } - public void onError(Exception e) { + public void onError(Throwable e) { if (finished || subscription.isUnsubscribed()) { // another thread has already finished us, so we won't proceed return; @@ -145,7 +145,7 @@ public void testSingleThreadedBasic() { verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); // non-deterministic because unsubscribe happens after 'waitToFinish' releases // so commenting out for now as this is not a critical thing to test here @@ -284,7 +284,7 @@ public void runConcurrencyTest() { @SuppressWarnings("unused") int numNextEvents = tw.assertEvents(null); // no check of type since we don't want to test barging results here, just interleaving behavior // System.out.println("Number of events executed: " + numNextEvents); - } catch (Exception e) { + } catch (Throwable e) { fail("Concurrency test failed: " + e.getMessage()); e.printStackTrace(); } finally { @@ -301,7 +301,7 @@ private static void waitOnThreads(Future... futures) { for (Future f : futures) { try { f.get(10, TimeUnit.SECONDS); - } catch (Exception e) { + } catch (Throwable e) { System.err.println("Failed while waiting on future."); e.printStackTrace(); } @@ -351,7 +351,7 @@ public void run() { for (Future f : waitOnThese) { try { f.get(); - } catch (Exception e) { + } catch (Throwable e) { System.err.println("Error while waiting on future in CompletionThread"); } } @@ -394,7 +394,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { events.add(TestConcurrencyObserverEvent.onError); } @@ -489,7 +489,7 @@ public void run() { observer.onNext(s); } observer.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException(e); } } @@ -559,7 +559,7 @@ public void run() { if (concurrentThreads > maxThreads) { maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); } - } catch (Exception e) { + } catch (Throwable e) { observer.onError(e); } finally { threadsRunning.decrementAndGet(); @@ -569,7 +569,7 @@ public void run() { } // we are done spawning threads threadPool.shutdown(); - } catch (Exception e) { + } catch (Throwable e) { throw new RuntimeException(e); } @@ -612,7 +612,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { System.out.println(">>> BusyObserver received onError: " + e.getMessage()); onError = true; } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java b/rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java index 042d59a8abb..8b621bb6519 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java @@ -19,9 +19,9 @@ import rx.Observer; /** - * Abstract class for defining error handling logic in addition to the normal {@link Observer#onError(Exception)} behavior. + * Abstract class for defining error handling logic in addition to the normal {@link Observer#onError(Throwable)} behavior. *

- * For example, all Exceptions can be logged using this handler even if {@link Observer#onError(Exception)} is ignored or not provided when an {@link Observable} is subscribed to. + * For example, all Exceptions can be logged using this handler even if {@link Observer#onError(Throwable)} is ignored or not provided when an {@link Observable} is subscribed to. *

* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/Plugins. @@ -29,12 +29,12 @@ public abstract class RxJavaErrorHandler { /** - * Receives all Exceptions from an {@link Observable} passed to {@link Observer#onError(Exception)}. + * Receives all Exceptions from an {@link Observable} passed to {@link Observer#onError(Throwable)}. * * @param e * Exception */ - public void handleError(Exception e) { + public void handleError(Throwable e) { // do nothing by default } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index eac11fa79f6..2aba6edecd1 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -69,18 +69,18 @@ public Subscription onSubscribeReturn(Observable observableInstance, Subs } /** - * Invoked after failed execution of {@link Observable#subscribe(Observer)} with thrown Exception. + * Invoked after failed execution of {@link Observable#subscribe(Observer)} with thrown Throwable. *

- * This is NOT errors emitted via {@link Observer#onError(Exception)} but exceptions thrown when attempting + * This is NOT errors emitted via {@link Observer#onError(Throwable)} but exceptions thrown when attempting * to subscribe to a {@link Func1}<{@link Observer}{@code }, {@link Subscription}>. * * @param observableInstance * The executing {@link Observable} instance. * @param e - * Exception thrown by {@link Observable#subscribe(Observer)} - * @return Exception that can be decorated, replaced or just returned as a pass-thru. + * Throwable thrown by {@link Observable#subscribe(Observer)} + * @return Throwable that can be decorated, replaced or just returned as a pass-thru. */ - public Exception onSubscribeError(Observable observableInstance, Exception e) { + public Throwable onSubscribeError(Observable observableInstance, Throwable e) { // pass-thru by default return e; } diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index f4978831c20..b154702ff85 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -113,7 +113,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { for (Observer observer : observers.values()) { observer.onError(e); } @@ -126,7 +126,7 @@ public void onNext(T args) { public static class UnitTest { - private final Exception testException = new Exception(); + private final Throwable testException = new Throwable(); @Test public void testNeverCompleted() { @@ -169,7 +169,7 @@ public void testCompleted() { private void assertCompletedObserver(Observer aObserver) { verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -186,7 +186,7 @@ public void testError() { subject.onNext("three"); subject.onError(testException); subject.onNext("four"); - subject.onError(new Exception()); + subject.onError(new Throwable()); subject.onCompleted(); assertErrorObserver(aObserver); @@ -222,7 +222,7 @@ public void testUnsubscribeBeforeCompleted() { private void assertNoOnNextEventsReceived(Observer aObserver) { verify(aObserver, Mockito.never()).onNext(anyString()); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); } @@ -248,7 +248,7 @@ public void call(AsyncSubject DefaultSubject) @Override public void call(AsyncSubject DefaultSubject) { - DefaultSubject.onError(new Exception()); + DefaultSubject.onError(new Throwable()); } }, null); diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index a7c369e5573..bed456b5ba8 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -115,7 +115,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { for (Observer observer : observers.values()) { observer.onError(e); } @@ -131,7 +131,7 @@ public void onNext(T args) { public static class UnitTest { - private final Exception testException = new Exception(); + private final Throwable testException = new Throwable(); @Test public void testThatObserverReceivesDefaultValueIfNothingWasPublished() { @@ -200,7 +200,7 @@ private void assertCompletedObserver(Observer aObserver) { verify(aObserver, times(1)).onNext("default"); verify(aObserver, times(1)).onNext("one"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -249,7 +249,7 @@ public void call(BehaviorSubject DefaultSubject) @Override public void call(BehaviorSubject DefaultSubject) { - DefaultSubject.onError(new Exception()); + DefaultSubject.onError(new Throwable()); } }, new Action1>() { diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 003d839f50a..5fccdb86d26 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -117,7 +117,7 @@ private Subscription checkTerminalState(Observer observer) { if (n.isOnCompleted()) { observer.onCompleted(); } else { - observer.onError(n.getException()); + observer.onError(n.getThrowable()); } return Subscriptions.empty(); } else { @@ -155,7 +155,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { /** * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription. * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the @@ -241,7 +241,7 @@ public void unsubscribe() { sub.unsubscribe(); } - private final Exception testException = new Exception(); + private final Throwable testException = new Throwable(); @Test public void testCompleted() { @@ -262,7 +262,7 @@ public void testCompleted() { subject.onNext("four"); subject.onCompleted(); - subject.onError(new Exception()); + subject.onError(new Throwable()); assertCompletedObserver(aObserver); // todo bug? assertNeverObserver(anotherObserver); @@ -273,14 +273,14 @@ private void assertCompletedObserver(Observer aObserver) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } private void assertNeverObserver(Observer aObserver) { verify(aObserver, Mockito.never()).onNext(any(String.class)); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); } @@ -302,7 +302,7 @@ public void testError() { subject.subscribe(anotherObserver); subject.onNext("four"); - subject.onError(new Exception()); + subject.onError(new Throwable()); subject.onCompleted(); assertErrorObserver(aObserver); @@ -347,7 +347,7 @@ private void assertCompletedStartingWithThreeObserver(Observer aObserver verify(aObserver, Mockito.never()).onNext("one"); verify(aObserver, Mockito.never()).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -381,7 +381,7 @@ private void assertObservedUntilTwo(Observer aObserver) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); } @@ -411,7 +411,7 @@ public void testUnsubscribeAfterOnCompleted() { inOrder.verify(anObserver, times(1)).onNext("one"); inOrder.verify(anObserver, times(1)).onNext("two"); inOrder.verify(anObserver, times(1)).onCompleted(); - inOrder.verify(anObserver, Mockito.never()).onError(any(Exception.class)); + inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class)); @SuppressWarnings("unchecked") Observer anotherObserver = mock(Observer.class); @@ -421,7 +421,7 @@ public void testUnsubscribeAfterOnCompleted() { inOrder.verify(anotherObserver, Mockito.never()).onNext("one"); inOrder.verify(anotherObserver, Mockito.never()).onNext("two"); inOrder.verify(anotherObserver, times(1)).onCompleted(); - inOrder.verify(anotherObserver, Mockito.never()).onError(any(Exception.class)); + inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class)); } @Test @@ -476,7 +476,7 @@ public void call(PublishSubject DefaultSubject) @Override public void call(PublishSubject DefaultSubject) { - DefaultSubject.onError(new Exception()); + DefaultSubject.onError(new Throwable()); } }, new Action1>() { diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 7d9e13bafc6..b6711c15c45 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -61,7 +61,7 @@ public final class ReplaySubject extends Subject { private boolean isDone = false; - private Exception exception = null; + private Throwable exception = null; private final Map> subscriptions = new HashMap>(); private final List history = Collections.synchronizedList(new ArrayList()); @@ -153,7 +153,7 @@ public void onCompleted() } @Override - public void onError(Exception e) + public void onError(Throwable e) { synchronized (subscriptions) { if (isDone) { @@ -181,7 +181,7 @@ public void onNext(T args) public static class UnitTest { - private final Exception testException = new Exception(); + private final Throwable testException = new Throwable(); @SuppressWarnings("unchecked") @Test @@ -198,7 +198,7 @@ public void testCompleted() { subject.onNext("four"); subject.onCompleted(); - subject.onError(new Exception()); + subject.onError(new Throwable()); assertCompletedObserver(o1); @@ -215,7 +215,7 @@ private void assertCompletedObserver(Observer aObserver) inOrder.verify(aObserver, times(1)).onNext("one"); inOrder.verify(aObserver, times(1)).onNext("two"); inOrder.verify(aObserver, times(1)).onNext("three"); - inOrder.verify(aObserver, Mockito.never()).onError(any(Exception.class)); + inOrder.verify(aObserver, Mockito.never()).onError(any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } @@ -234,7 +234,7 @@ public void testError() { subject.onError(testException); subject.onNext("four"); - subject.onError(new Exception()); + subject.onError(new Throwable()); subject.onCompleted(); assertErrorObserver(aObserver); @@ -307,7 +307,7 @@ private void assertObservedUntilTwo(Observer aObserver) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); } @@ -333,7 +333,7 @@ public void call(ReplaySubject repeatSubject) @Override public void call(ReplaySubject repeatSubject) { - repeatSubject.onError(new Exception()); + repeatSubject.onError(new Throwable()); } }, new Action1>() { diff --git a/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java index acd7fe19193..30674c9a43a 100644 --- a/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java +++ b/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java @@ -96,7 +96,7 @@ public void onCompleted() } @Override - public void onError(Exception e) + public void onError(Throwable e) { test.gotEvent("onError"); } @@ -122,7 +122,7 @@ public void onCompleted() } @Override - public void onError(Exception e) + public void onError(Throwable e) { test.doUnsubscribe("onError"); } @@ -148,7 +148,7 @@ public void onCompleted() } @Override - public void onError(Exception e) + public void onError(Throwable e) { test.gotEvent("onError"); } diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 4753f23acf2..873176c8d43 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -69,13 +69,13 @@ public synchronized void add(Subscription s) { @Override public synchronized void unsubscribe() { if (unsubscribed.compareAndSet(false, true)) { - Collection es = null; + Collection es = null; for (Subscription s : subscriptions) { try { s.unsubscribe(); - } catch (Exception e) { + } catch (Throwable e) { if (es == null) { - es = new ArrayList(); + es = new ArrayList(); } es.add(e); } diff --git a/rxjava-core/src/main/java/rx/util/CompositeException.java b/rxjava-core/src/main/java/rx/util/CompositeException.java index c19a5f5ff04..b233a9f90b6 100644 --- a/rxjava-core/src/main/java/rx/util/CompositeException.java +++ b/rxjava-core/src/main/java/rx/util/CompositeException.java @@ -29,17 +29,17 @@ public class CompositeException extends RuntimeException { private static final long serialVersionUID = 3026362227162912146L; - private final List exceptions; + private final List exceptions; private final String message; - public CompositeException(String messagePrefix, Collection errors) { + public CompositeException(String messagePrefix, Collection errors) { StringBuilder _message = new StringBuilder(); if (messagePrefix != null) { _message.append(messagePrefix).append(" => "); } - List _exceptions = new ArrayList(); - for (Exception e : errors) { + List _exceptions = new ArrayList(); + for (Throwable e : errors) { _exceptions.add(e); if (_message.length() > 0) { _message.append(", "); @@ -50,11 +50,11 @@ public CompositeException(String messagePrefix, Collection errors) { this.message = _message.toString(); } - public CompositeException(Collection errors) { + public CompositeException(Collection errors) { this(null, errors); } - public List getExceptions() { + public List getExceptions() { return exceptions; } diff --git a/rxjava-core/src/main/java/rx/util/OnErrorNotImplementedException.java b/rxjava-core/src/main/java/rx/util/OnErrorNotImplementedException.java index b84d32bb289..d6124f65d34 100644 --- a/rxjava-core/src/main/java/rx/util/OnErrorNotImplementedException.java +++ b/rxjava-core/src/main/java/rx/util/OnErrorNotImplementedException.java @@ -3,7 +3,7 @@ import rx.Observer; /** - * Used for re-throwing {@link Observer#onError(Exception)} when an implementation doesn't exist. + * Used for re-throwing {@link Observer#onError(Throwable)} when an implementation doesn't exist. * * https://github.com/Netflix/RxJava/issues/198 * diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 53671c210d6..4486e7bd26b 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -52,7 +52,7 @@ private static boolean loadLanguageAdaptor(String name) { } catch (ClassNotFoundException e) { System.err.println("RxJava => Could not find function language adaptor: " + name + " with path: " + className); return false; - } catch (Exception e) { + } catch (Throwable e) { System.err.println("RxJava => Failed trying to initialize function language adaptor: " + className); e.printStackTrace(); return false; diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index 661c423fe63..71a9678c676 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -330,7 +330,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { System.out.println("Error"); } @@ -491,7 +491,7 @@ public Subscription call(final Observer observer) { private static class ConcurrentObserverValidator implements Observer { final AtomicInteger concurrentCounter = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference(); final CountDownLatch completed = new CountDownLatch(1); @Override @@ -500,7 +500,7 @@ public void onCompleted() { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { completed.countDown(); error.set(e); } diff --git a/rxjava-core/src/test/java/rx/performance/PerformanceTest.java b/rxjava-core/src/test/java/rx/performance/PerformanceTest.java index 7928776f9aa..9d26581e6b0 100644 --- a/rxjava-core/src/test/java/rx/performance/PerformanceTest.java +++ b/rxjava-core/src/test/java/rx/performance/PerformanceTest.java @@ -231,7 +231,7 @@ public void onNext(Integer i) { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { e.printStackTrace(); } @@ -260,7 +260,7 @@ public void onNext(String i) { } @Override - public void onError(Exception e) { + public void onError(Throwable e) { e.printStackTrace(); }