Skip to content

Commit

Permalink
Merge pull request #3155 from davidmoten/safe-sub-catch
Browse files Browse the repository at this point in the history
SafeSubscriber - report onCompleted unsubscribe error to RxJavaPlugin
  • Loading branch information
akarnokd committed Aug 24, 2015
2 parents 189928c + 5fec06f commit d71314e
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 66 deletions.
29 changes: 29 additions & 0 deletions src/main/java/rx/exceptions/OnCompletedFailedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;

public final class OnCompletedFailedException extends RuntimeException {

private static final long serialVersionUID = 8622579378868820554L;

public OnCompletedFailedException(Throwable throwable) {
super(throwable);
}

public OnCompletedFailedException(String message, Throwable throwable) {
super(message, throwable);
}
}
30 changes: 30 additions & 0 deletions src/main/java/rx/exceptions/UnsubscribeFailedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;

public final class UnsubscribeFailedException extends RuntimeException {

private static final long serialVersionUID = 4594672310593167598L;

public UnsubscribeFailedException(Throwable throwable) {
super(throwable);
}

public UnsubscribeFailedException(String message, Throwable throwable) {
super(message, throwable);
}

}
40 changes: 40 additions & 0 deletions src/main/java/rx/internal/util/RxJavaPluginUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import rx.plugins.RxJavaPlugins;

public final class RxJavaPluginUtils {

public static void handleException(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
}

private static void handlePluginException(Throwable pluginException) {
/*
* We don't want errors from the plugin to affect normal flow.
* Since the plugin should never throw this is a safety net
* and will complain loudly to System.err so it gets fixed.
*/
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
pluginException.printStackTrace();
}

}
58 changes: 18 additions & 40 deletions src/main/java/rx/observers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.exceptions.OnCompletedFailedException;
import rx.exceptions.OnErrorFailedException;
import rx.exceptions.OnErrorNotImplementedException;
import rx.plugins.RxJavaPlugins;
import rx.exceptions.UnsubscribeFailedException;
import rx.internal.util.RxJavaPluginUtils;

/**
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
Expand Down Expand Up @@ -83,11 +85,17 @@ public void onCompleted() {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onCompleted implementation fails, not just if the Observable fails
_onError(e);
RxJavaPluginUtils.handleException(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally {
// auto-unsubscribe
unsubscribe();
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaPluginUtils.handleException(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}
Expand Down Expand Up @@ -145,11 +153,7 @@ public void onNext(T args) {
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
*/
protected void _onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(e);
try {
actual.onError(e);
} catch (Throwable e2) {
Expand All @@ -168,11 +172,7 @@ protected void _onError(Throwable e) {
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
Expand All @@ -182,19 +182,11 @@ protected void _onError(Throwable e) {
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}

Expand All @@ -205,25 +197,11 @@ protected void _onError(Throwable e) {
try {
unsubscribe();
} catch (RuntimeException unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}

private void handlePluginException(Throwable pluginException) {
/*
* We don't want errors from the plugin to affect normal flow.
* Since the plugin should never throw this is a safety net
* and will complain loudly to System.err so it gets fixed.
*/
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
pluginException.printStackTrace();
}

/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
Expand Down
27 changes: 9 additions & 18 deletions src/test/java/rx/observers/SafeObserverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.junit.Test;

import junit.framework.Assert;
import rx.Subscriber;
import rx.exceptions.*;
import rx.functions.Action0;
Expand Down Expand Up @@ -68,19 +69,6 @@ public void onCompletedFailure() {
}
}

@Test
public void onCompletedFailureSafe() {
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
try {
new SafeSubscriber<String>(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted();
assertNotNull(onError.get());
assertTrue(onError.get() instanceof SafeObserverTestException);
assertEquals("onCompletedFail", onError.get().getMessage());
} catch (Exception e) {
fail("expects exception to be passed to onError");
}
}

@Test
public void onErrorFailure() {
try {
Expand Down Expand Up @@ -184,8 +172,8 @@ public void call() {
e.printStackTrace();

assertTrue(o.isUnsubscribed());

assertTrue(e instanceof SafeObserverTestException);
assertTrue(e instanceof UnsubscribeFailedException);
assertTrue(e.getCause() instanceof SafeObserverTestException);
assertEquals("failure from unsubscribe", e.getMessage());
// expected since onError fails so SafeObserver can't help
}
Expand Down Expand Up @@ -475,9 +463,12 @@ public void onCompleted() {
}
});

s.onCompleted();

assertTrue("Error not received", error.get() instanceof TestException);
try {
s.onCompleted();
Assert.fail();
} catch (OnCompletedFailedException e) {
assertNull(error.get());
}
}

@Test
Expand Down
Loading

0 comments on commit d71314e

Please sign in to comment.