Skip to content

Commit

Permalink
Merge 0.20.x into 1.x
Browse files Browse the repository at this point in the history
ReactiveX#1632 Composite Exception - Circular Reference Handling
ReactiveX#1631 Handle Fatal Exceptions in doOnEach
  • Loading branch information
benjchristensen committed Aug 27, 2014
2 parents 0aab682 + 25e1805 commit dd56779
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 111 deletions.
247 changes: 174 additions & 73 deletions rxjava/src/main/java/rx/exceptions/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,61 @@
*/
package rx.exceptions;

import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

/**
* Exception that is a composite of 1 or more other exceptions.
* <p>
* Use <code>getMessage()</code> to retrieve a concatenation of the composite exceptions.
* An Exception that is a composite of one or more other Exceptions. A {@code CompositeException} does not
* modify the structure of any exception it wraps, but at print-time it iterates through the list of
* Throwables contained in the composit in order to print them all.
*
* Its invariant is to contain an immutable, ordered (by insertion order), unique list of non-composite
* exceptions. You can retrieve individual exceptions in this list with {@link #getExceptions()}.
*
* The {@link #printStackTrace()} implementation handles the StackTrace in a customized way instead of using
* {@code getCause()} so that it can avoid circular references.
*
* If you invoke {@link #getCause()}, it will lazily create the causal chain but will stop if it finds any
* Throwable in the chain that it has already seen.
*/
public final class CompositeException extends RuntimeException {

private static final long serialVersionUID = 3026362227162912146L;

private final List<Throwable> exceptions;
private final String message;
private final Throwable cause;

public CompositeException(String messagePrefix, Collection<Throwable> errors) {
public CompositeException(String messagePrefix, Collection<? extends Throwable> errors) {
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
List<Throwable> _exceptions = new ArrayList<Throwable>();
CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain();
int count = errors.size();
errors = removeDuplicatedCauses(errors);
for (Throwable e : errors) {
attachCallingThreadStack(_cause, e);
_exceptions.add(e);
for (Throwable ex : errors) {
if (ex instanceof CompositeException) {
deDupedExceptions.addAll(((CompositeException) ex).getExceptions());
} else {
deDupedExceptions.add(ex);
}
}

_exceptions.addAll(deDupedExceptions);
this.exceptions = Collections.unmodifiableList(_exceptions);

String msg = count + " exceptions occurred. See them in causal chain below.";
if(messagePrefix != null) {
msg = messagePrefix + " " + msg;
}
this.message = msg;
this.cause = _cause;
this.message = exceptions.size() + " exceptions occurred. ";
}

public CompositeException(Collection<Throwable> errors) {
public CompositeException(Collection<? extends Throwable> errors) {
this(null, errors);
}

/**
* Retrieves the list of exceptions that make up the {@code CompositeException}
*
* @return the exceptions that make up the {@code CompositeException}, as a {@link List} of
* {@link Throwable}s
* @return the exceptions that make up the {@code CompositeException}, as a {@link List} of {@link Throwable}s
*/
public List<Throwable> getExceptions() {
return exceptions;
Expand All @@ -73,82 +80,176 @@ public String getMessage() {
return message;
}

private Throwable cause = null;

@Override
public synchronized Throwable getCause() {
return cause;
}

private Collection<Throwable> removeDuplicatedCauses(Collection<Throwable> errors) {
Set<Throwable> duplicated = new HashSet<Throwable>();
for (Throwable cause : errors) {
for (Throwable error : errors) {
if(cause == error || duplicated.contains(error)) {
if (cause == null) {
// we lazily generate this causal chain if this is called
CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain();
Set<Throwable> seenCauses = new HashSet<Throwable>();

Throwable chain = _cause;
for (Throwable e : exceptions) {
if (seenCauses.contains(e)) {
// already seen this outer Throwable so skip
continue;
}
while (error.getCause() != null) {
error = error.getCause();
if (error == cause) {
duplicated.add(cause);
break;
seenCauses.add(e);

List<Throwable> listOfCauses = getListOfCauses(e);
// check if any of them have been seen before
for(Throwable child : listOfCauses) {
if (seenCauses.contains(child)) {
// already seen this outer Throwable so skip
e = new RuntimeException("Duplicate found in causal chain so cropping to prevent loop ...");
continue;
}
seenCauses.add(child);
}

// we now have 'e' as the last in the chain
try {
chain.initCause(e);
} catch (Throwable t) {
// ignore
// the javadocs say that some Throwables (depending on how they're made) will never
// let me call initCause without blowing up even if it returns null
}
chain = chain.getCause();
}
cause = _cause;
}
return cause;
}

/**
* All of the following {@code printStackTrace} functionality is derived from JDK {@link Throwable}
* {@code printStackTrace}. In particular, the {@code PrintStreamOrWriter} abstraction is copied wholesale.
*
* Changes from the official JDK implementation:<ul>
* <li>no infinite loop detection</li>
* <li>smaller critical section holding {@link PrintStream} lock</li>
* <li>explicit knowledge about the exceptions {@link List} that this loops through</li>
* </ul>
*/
@Override
public void printStackTrace() {
printStackTrace(System.err);
}

@Override
public void printStackTrace(PrintStream s) {
printStackTrace(new WrappedPrintStream(s));
}

@Override
public void printStackTrace(PrintWriter s) {
printStackTrace(new WrappedPrintWriter(s));
}

/**
* Special handling for printing out a {@code CompositeException}.
* Loops through all inner exceptions and prints them out.
*
* @param s
* stream to print to
*/
private void printStackTrace(PrintStreamOrWriter s) {
StringBuilder bldr = new StringBuilder();
bldr.append(this).append("\n");
for (StackTraceElement myStackElement : getStackTrace()) {
bldr.append("\tat ").append(myStackElement).append("\n");
}
if (!duplicated.isEmpty()) {
errors = new ArrayList<Throwable>(errors);
errors.removeAll(duplicated);
int i = 1;
for (Throwable ex : exceptions) {
bldr.append(" ComposedException ").append(i).append(" :").append("\n");
appendStackTrace(bldr, ex, "\t");
i++;
}
synchronized (s.lock()) {
s.println(bldr.toString());
}
return errors;
}

@SuppressWarnings("unused")
// useful when debugging but don't want to make part of publicly supported API
private static String getStackTraceAsString(StackTraceElement[] stack) {
StringBuilder s = new StringBuilder();
boolean firstLine = true;
for (StackTraceElement e : stack) {
if (e.toString().startsWith("java.lang.Thread.getStackTrace")) {
// we'll ignore this one
continue;
}
if (!firstLine) {
s.append("\n\t");
}
s.append(e.toString());
firstLine = false;
private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) {
bldr.append(prefix).append(ex).append("\n");
for (StackTraceElement stackElement : ex.getStackTrace()) {
bldr.append("\t\tat ").append(stackElement).append("\n");
}
if (ex.getCause() != null) {
bldr.append("\tCaused by: ");
appendStackTrace(bldr, ex.getCause(), "");
}
return s.toString();
}

/* package-private */ static void attachCallingThreadStack(Throwable e, Throwable cause) {
Set<Throwable> seenCauses = new HashSet<Throwable>();
private abstract static class PrintStreamOrWriter {
/** Returns the object to be locked when using this StreamOrWriter */
abstract Object lock();

while (e.getCause() != null) {
e = e.getCause();
if (seenCauses.contains(e.getCause())) {
break;
} else {
seenCauses.add(e.getCause());
}
/** Prints the specified string as a line on this StreamOrWriter */
abstract void println(Object o);
}

/**
* Same abstraction and implementation as in JDK to allow PrintStream and PrintWriter to share implementation
*/
private static class WrappedPrintStream extends PrintStreamOrWriter {
private final PrintStream printStream;

WrappedPrintStream(PrintStream printStream) {
this.printStream = printStream;
}

Object lock() {
return printStream;
}

void println(Object o) {
printStream.println(o);
}
}

private static class WrappedPrintWriter extends PrintStreamOrWriter {
private final PrintWriter printWriter;

WrappedPrintWriter(PrintWriter printWriter) {
this.printWriter = printWriter;
}
// we now have 'e' as the last in the chain
try {
e.initCause(cause);
} catch (Throwable t) {
// ignore
// the javadocs say that some Throwables (depending on how they're made) will never
// let me call initCause without blowing up even if it returns null

Object lock() {
return printWriter;
}

void println(Object o) {
printWriter.println(o);
}
}

/* package-private */ final static class CompositeExceptionCausalChain extends RuntimeException {
/* package-private */final static class CompositeExceptionCausalChain extends RuntimeException {
private static final long serialVersionUID = 3875212506787802066L;
/* package-private */ static String MESSAGE = "Chain of Causes for CompositeException In Order Received =>";
/* package-private */static String MESSAGE = "Chain of Causes for CompositeException In Order Received =>";

@Override
public String getMessage() {
return MESSAGE;
}
}

}
private final List<Throwable> getListOfCauses(Throwable ex) {
List<Throwable> list = new ArrayList<Throwable>();
Throwable root = ex.getCause();
if (root == null) {
return list;
} else {
while(true) {
list.add(root);
if (root.getCause() == null) {
return list;
} else {
root = root.getCause();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;

/**
Expand Down Expand Up @@ -54,6 +55,8 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
// need to throwIfFatal since we swallow errors after terminated
Exceptions.throwIfFatal(e);
if (done) {
return;
}
Expand Down
Loading

0 comments on commit dd56779

Please sign in to comment.