Skip to content

Commit

Permalink
Merge pull request #730 from benjchristensen/unsubscribe-error-handling
Browse files Browse the repository at this point in the history
Improve Error Handling and Stacktraces When Unsubscribe Fails
  • Loading branch information
benjchristensen committed Jan 9, 2014
2 parents 72f043e + 8e6bef3 commit 0b1b6e7
Show file tree
Hide file tree
Showing 7 changed files with 433 additions and 57 deletions.
14 changes: 0 additions & 14 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
handleError(e);
throw new OnErrorNotImplementedException(e);
}

Expand Down Expand Up @@ -373,7 +372,6 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
handleError(e);
throw new OnErrorNotImplementedException(e);
}

Expand Down Expand Up @@ -430,7 +428,6 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
handleError(e);
onError.call(e);
}

Expand Down Expand Up @@ -491,7 +488,6 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
handleError(e);
onError.call(e);
}

Expand Down Expand Up @@ -561,16 +557,6 @@ public <TIntermediate, TResult> Observable<TResult> multicast(
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
return OperationMulticast.multicast(this, subjectFactory, selector);
}
/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from
* onError.
*
* @param e
*/
private void handleError(Throwable e) {
// onError should be rare so we'll only fetch when needed
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}

/**
* An Observable that never sends any information to an {@link Observer}.
Expand Down
34 changes: 26 additions & 8 deletions rxjava-core/src/main/java/rx/operators/SafeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ public void onNext(T args) {
*/
protected void _onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
actual.onError(e);
} catch (Throwable e2) {
if (e2 instanceof OnErrorNotImplementedException) {
/**
/*
* onError isn't implemented so throw
*
* https://github.com/Netflix/RxJava/issues/198
Expand All @@ -128,19 +129,36 @@ protected void _onError(Throwable e) {
* to rethrow the exception on the thread that the message comes out from the observable sequence.
* The OnCompleted behavior in this case is to do nothing."
*/
try {
subscription.unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
} else {
// if the onError itself fails then pass to the plugin
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/Netflix/RxJava/issues/198
*/
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
// and throw exception despite that not being proper for Rx
// https://github.com/Netflix/RxJava/issues/198
try {
subscription.unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}

throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
} finally {
// auto-unsubscribe
}
// if we did not throw about we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
subscription.unsubscribe();
} catch (RuntimeException unsubscribeException) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
throw unsubscribeException;
}
}

Expand Down
2 changes: 2 additions & 0 deletions rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public abstract class RxJavaErrorHandler {

/**
* Receives all Exceptions from an {@link Observable} passed to {@link Observer#onError(Throwable)}.
* <p>
* This should NEVER throw an Exception. Make sure to try/catch(Throwable) all code inside this method implementation.
*
* @param e
* Exception
Expand Down
15 changes: 10 additions & 5 deletions rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ public class RxJavaPlugins {
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();

public static RxJavaPlugins getInstance() {
return INSTANCE;
}

/* package accessible for unit tests */RxJavaPlugins() {

}

public static RxJavaPlugins getInstance() {
return INSTANCE;

/* package accessible for ujnit tests */ void reset() {
INSTANCE.errorHandler.set(null);
INSTANCE.observableExecutionHook.set(null);
}

/**
Expand Down Expand Up @@ -74,7 +79,7 @@ public RxJavaErrorHandler getErrorHandler() {
*/
public void registerErrorHandler(RxJavaErrorHandler impl) {
if (!errorHandler.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered.");
throw new IllegalStateException("Another strategy was already registered: " + errorHandler.get());
}
}

Expand Down Expand Up @@ -112,7 +117,7 @@ public RxJavaObservableExecutionHook getObservableExecutionHook() {
*/
public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) {
if (!observableExecutionHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered.");
throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook.get());
}
}

Expand Down
66 changes: 55 additions & 11 deletions rxjava-core/src/main/java/rx/util/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,26 @@
* <p>
* The <code>getMessage()</code> will return a concatenation of the composite exceptions.
*/
public class CompositeException extends RuntimeException {
public final class CompositeException extends RuntimeException {

private static final long serialVersionUID = 3026362227162912146L;

private final List<Throwable> exceptions;
private final String message;
private final Throwable cause;

public CompositeException(String messagePrefix, Collection<Throwable> errors) {
StringBuilder _message = new StringBuilder();
if (messagePrefix != null) {
_message.append(messagePrefix).append(" => ");
}

List<Throwable> _exceptions = new ArrayList<Throwable>();
CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain();
int count = 0;
for (Throwable e : errors) {
count++;
attachCallingThreadStack(_cause, e);
_exceptions.add(e);
if (_message.length() > 0) {
_message.append(", ");
}
_message.append(e.getClass().getSimpleName()).append(":").append(e.getMessage());
}
this.exceptions = Collections.unmodifiableList(_exceptions);
this.message = _message.toString();
this.message = count + " exceptions occurred. See them in causal chain below.";
this.cause = _cause;
}

public CompositeException(Collection<Throwable> errors) {
Expand All @@ -62,4 +59,51 @@ public List<Throwable> getExceptions() {
public String getMessage() {
return message;
}

@Override
public synchronized Throwable getCause() {
return cause;
}

@SuppressWarnings("unused") // useful when debugging but don't want to make part of publicly supported API
private static String getStackTraceAsString(StackTraceElement[] stack) {
StringBuilder s = new StringBuilder();
boolean firstLine = true;
for (StackTraceElement e : stack) {
if (e.toString().startsWith("java.lang.Thread.getStackTrace")) {
// we'll ignore this one
continue;
}
if (!firstLine) {
s.append("\n\t");
}
s.append(e.toString());
firstLine = false;
}
return s.toString();
}

private static void attachCallingThreadStack(Throwable e, Throwable cause) {
while (e.getCause() != null) {
e = e.getCause();
}
// we now have 'e' as the last in the chain
try {
e.initCause(cause);
} catch (Throwable t) {
// ignore
// the javadocs say that some Throwables (depending on how they're made) will never
// let me call initCause without blowing up even if it returns null
}
}

private final static class CompositeExceptionCausalChain extends RuntimeException {
private static final long serialVersionUID = 3875212506787802066L;

@Override
public String getMessage() {
return "Chain of Causes for CompositeException In Order Received =>";
}
}

}
Loading

0 comments on commit 0b1b6e7

Please sign in to comment.