diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java index 98f5c4ae6a..9ee684a589 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java @@ -18,6 +18,7 @@ import rx.Notification; import rx.Observable.Operator; import rx.Subscriber; +import rx.plugins.RxJavaPlugins; /** * Turns all of the notifications from an Observable into onNext emissions, and marks @@ -42,6 +43,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); child.onNext(Notification. createOnError(e)); child.onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java index 3985e4b083..fe57ac8354 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java @@ -125,10 +125,12 @@ public void onError(Throwable e) { public void onCompleted() { complete(); } + void error(Throwable e) { exceptions.add(e); complete(); } + void complete() { if (WIP_UPDATER.decrementAndGet(this) == 0) { if (exceptions.isEmpty()) { diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java index e7750785c3..f8e56971f8 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java @@ -20,6 +20,7 @@ import rx.Subscriber; import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; /** * Allows inserting onNext events into a stream when onError events are received @@ -46,6 +47,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); Observable resume = resumeFunction.call(OnErrorThrowable.from(e)); resume.unsafeSubscribe(new Subscriber() { diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index c1a5c8bb73..a2b71258f0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -19,6 +19,7 @@ import rx.Observable.Operator; import rx.Subscriber; import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to pass control to another Observable (the return value of a function) @@ -59,6 +60,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); Observable resume = resumeFunction.call(e); resume.unsafeSubscribe(child); } catch (Throwable e2) { diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java index efee5c8c1b..56091cb005 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to pass control to another Observable rather than invoking @@ -58,6 +59,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); resumeSequence.unsafeSubscribe(child); } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java index f96441a76f..bd1536ff75 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java @@ -20,6 +20,7 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to emit a particular item to its Observer's onNext method @@ -59,6 +60,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); T result = resultFunction.call(e); child.onNext(result); diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java index db3a344cce..78008e56d0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to pass control to another Observable rather than invoking @@ -63,6 +64,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { if (e instanceof Exception) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); resumeSequence.unsafeSubscribe(child); } else {