Skip to content

Commit

Permalink
Merge pull request #1306 from mattrjacobs/add-error-handler-to-error-…
Browse files Browse the repository at this point in the history
…swallowing-operators

Hooked RxJavaPlugins errorHandler up within all operators that swallow onErrors
  • Loading branch information
benjchristensen committed Jun 2, 2014
2 parents c375f56 + 3cf18a1 commit 6417f0b
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Notification;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.plugins.RxJavaPlugins;

/**
* Turns all of the notifications from an Observable into <code>onNext</code> emissions, and marks
Expand All @@ -42,6 +43,7 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
child.onNext(Notification.<T> createOnError(e));
child.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ public void onError(Throwable e) {
public void onCompleted() {
complete();
}

void error(Throwable e) {
exceptions.add(e);
complete();
}

void complete() {
if (WIP_UPDATER.decrementAndGet(this) == 0) {
if (exceptions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

/**
* Allows inserting onNext events into a stream when onError events are received
Expand All @@ -46,6 +47,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
Observable<? extends T> resume = resumeFunction.call(OnErrorThrowable.from(e));
resume.unsafeSubscribe(new Subscriber<T>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

/**
* Instruct an Observable to pass control to another Observable (the return value of a function)
Expand Down Expand Up @@ -59,6 +60,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
Observable<? extends T> resume = resumeFunction.call(e);
resume.unsafeSubscribe(child);
} catch (Throwable e2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.plugins.RxJavaPlugins;

/**
* Instruct an Observable to pass control to another Observable rather than invoking
Expand Down Expand Up @@ -58,6 +59,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable e) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
resumeSequence.unsafeSubscribe(child);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

/**
* Instruct an Observable to emit a particular item to its Observer's <code>onNext</code> method
Expand Down Expand Up @@ -59,6 +60,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
T result = resultFunction.call(e);

child.onNext(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.plugins.RxJavaPlugins;

/**
* Instruct an Observable to pass control to another Observable rather than invoking
Expand Down Expand Up @@ -63,6 +64,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable e) {
if (e instanceof Exception) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
resumeSequence.unsafeSubscribe(child);
} else {
Expand Down

0 comments on commit 6417f0b

Please sign in to comment.