diff --git a/src/main/java/rx/exceptions/OnCompletedFailedException.java b/src/main/java/rx/exceptions/OnCompletedFailedException.java
new file mode 100644
index 0000000000..37632d86c6
--- /dev/null
+++ b/src/main/java/rx/exceptions/OnCompletedFailedException.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.exceptions;
+
+public final class OnCompletedFailedException extends RuntimeException {
+
+ private static final long serialVersionUID = 8622579378868820554L;
+
+ public OnCompletedFailedException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public OnCompletedFailedException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+}
diff --git a/src/main/java/rx/exceptions/UnsubscribeFailedException.java b/src/main/java/rx/exceptions/UnsubscribeFailedException.java
new file mode 100644
index 0000000000..8b01df8aa3
--- /dev/null
+++ b/src/main/java/rx/exceptions/UnsubscribeFailedException.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.exceptions;
+
+public final class UnsubscribeFailedException extends RuntimeException {
+
+ private static final long serialVersionUID = 4594672310593167598L;
+
+ public UnsubscribeFailedException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public UnsubscribeFailedException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+}
diff --git a/src/main/java/rx/internal/util/RxJavaPluginUtils.java b/src/main/java/rx/internal/util/RxJavaPluginUtils.java
new file mode 100644
index 0000000000..b6b462412c
--- /dev/null
+++ b/src/main/java/rx/internal/util/RxJavaPluginUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package rx.internal.util;
+
+import rx.plugins.RxJavaPlugins;
+
+public final class RxJavaPluginUtils {
+
+ public static void handleException(Throwable e) {
+ try {
+ RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ } catch (Throwable pluginException) {
+ handlePluginException(pluginException);
+ }
+ }
+
+ private static void handlePluginException(Throwable pluginException) {
+ /*
+ * We don't want errors from the plugin to affect normal flow.
+ * Since the plugin should never throw this is a safety net
+ * and will complain loudly to System.err so it gets fixed.
+ */
+ System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
+ pluginException.printStackTrace();
+ }
+
+}
diff --git a/src/main/java/rx/observers/SafeSubscriber.java b/src/main/java/rx/observers/SafeSubscriber.java
index 0181887c34..8a9aad5179 100644
--- a/src/main/java/rx/observers/SafeSubscriber.java
+++ b/src/main/java/rx/observers/SafeSubscriber.java
@@ -20,9 +20,11 @@
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
+import rx.exceptions.OnCompletedFailedException;
import rx.exceptions.OnErrorFailedException;
import rx.exceptions.OnErrorNotImplementedException;
-import rx.plugins.RxJavaPlugins;
+import rx.exceptions.UnsubscribeFailedException;
+import rx.internal.util.RxJavaPluginUtils;
/**
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
@@ -83,11 +85,17 @@ public void onCompleted() {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
- // handle errors if the onCompleted implementation fails, not just if the Observable fails
- _onError(e);
+ RxJavaPluginUtils.handleException(e);
+ throw new OnCompletedFailedException(e.getMessage(), e);
} finally {
- // auto-unsubscribe
- unsubscribe();
+ try {
+ // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
+ // and we throw an UnsubscribeFailureException.
+ unsubscribe();
+ } catch (Throwable e) {
+ RxJavaPluginUtils.handleException(e);
+ throw new UnsubscribeFailedException(e.getMessage(), e);
+ }
}
}
}
@@ -145,11 +153,7 @@ public void onNext(T args) {
* @see the report of this bug
*/
protected void _onError(Throwable e) {
- try {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
- } catch (Throwable pluginException) {
- handlePluginException(pluginException);
- }
+ RxJavaPluginUtils.handleException(e);
try {
actual.onError(e);
} catch (Throwable e2) {
@@ -168,11 +172,7 @@ protected void _onError(Throwable e) {
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
- try {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
- } catch (Throwable pluginException) {
- handlePluginException(pluginException);
- }
+ RxJavaPluginUtils.handleException(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
@@ -182,19 +182,11 @@ protected void _onError(Throwable e) {
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
- try {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
- } catch (Throwable pluginException) {
- handlePluginException(pluginException);
- }
+ RxJavaPluginUtils.handleException(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
- try {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
- } catch (Throwable pluginException) {
- handlePluginException(pluginException);
- }
+ RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
@@ -205,25 +197,11 @@ protected void _onError(Throwable e) {
try {
unsubscribe();
} catch (RuntimeException unsubscribeException) {
- try {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
- } catch (Throwable pluginException) {
- handlePluginException(pluginException);
- }
+ RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}
- private void handlePluginException(Throwable pluginException) {
- /*
- * We don't want errors from the plugin to affect normal flow.
- * Since the plugin should never throw this is a safety net
- * and will complain loudly to System.err so it gets fixed.
- */
- System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
- pluginException.printStackTrace();
- }
-
/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
diff --git a/src/test/java/rx/observers/SafeObserverTest.java b/src/test/java/rx/observers/SafeObserverTest.java
index 1083e995c7..7924bb4026 100644
--- a/src/test/java/rx/observers/SafeObserverTest.java
+++ b/src/test/java/rx/observers/SafeObserverTest.java
@@ -22,6 +22,7 @@
import org.junit.Test;
+import junit.framework.Assert;
import rx.Subscriber;
import rx.exceptions.*;
import rx.functions.Action0;
@@ -68,19 +69,6 @@ public void onCompletedFailure() {
}
}
- @Test
- public void onCompletedFailureSafe() {
- AtomicReference onError = new AtomicReference();
- try {
- new SafeSubscriber(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted();
- assertNotNull(onError.get());
- assertTrue(onError.get() instanceof SafeObserverTestException);
- assertEquals("onCompletedFail", onError.get().getMessage());
- } catch (Exception e) {
- fail("expects exception to be passed to onError");
- }
- }
-
@Test
public void onErrorFailure() {
try {
@@ -184,8 +172,8 @@ public void call() {
e.printStackTrace();
assertTrue(o.isUnsubscribed());
-
- assertTrue(e instanceof SafeObserverTestException);
+ assertTrue(e instanceof UnsubscribeFailedException);
+ assertTrue(e.getCause() instanceof SafeObserverTestException);
assertEquals("failure from unsubscribe", e.getMessage());
// expected since onError fails so SafeObserver can't help
}
@@ -475,9 +463,12 @@ public void onCompleted() {
}
});
- s.onCompleted();
-
- assertTrue("Error not received", error.get() instanceof TestException);
+ try {
+ s.onCompleted();
+ Assert.fail();
+ } catch (OnCompletedFailedException e) {
+ assertNull(error.get());
+ }
}
@Test
diff --git a/src/test/java/rx/observers/SafeSubscriberTest.java b/src/test/java/rx/observers/SafeSubscriberTest.java
index 85c2d7b07f..5ce37cdea4 100644
--- a/src/test/java/rx/observers/SafeSubscriberTest.java
+++ b/src/test/java/rx/observers/SafeSubscriberTest.java
@@ -15,15 +15,25 @@
*/
package rx.observers;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
-import rx.exceptions.*;
+import rx.exceptions.OnCompletedFailedException;
+import rx.exceptions.OnErrorFailedException;
+import rx.exceptions.OnErrorNotImplementedException;
+import rx.exceptions.TestException;
+import rx.exceptions.UnsubscribeFailedException;
import rx.functions.Action0;
-import rx.plugins.*;
+import rx.plugins.RxJavaErrorHandler;
+import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
public class SafeSubscriberTest {
@@ -51,10 +61,12 @@ public void onCompleted() {
}
};
SafeSubscriber safe = new SafeSubscriber(ts);
-
- safe.onCompleted();
-
- assertTrue(safe.isUnsubscribed());
+ try {
+ safe.onCompleted();
+ Assert.fail();
+ } catch (OnCompletedFailedException e) {
+ assertTrue(safe.isUnsubscribed());
+ }
}
@Test
@@ -76,7 +88,7 @@ public void onCompleted() {
assertTrue(safe.isUnsubscribed());
}
- @Test
+ @Test(expected=OnCompletedFailedException.class)
public void testPluginException() {
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override
@@ -227,4 +239,81 @@ public void call() {
safe.onError(new TestException());
}
+
+ @Test
+ public void testPluginErrorHandlerReceivesExceptionWhenUnsubscribeAfterCompletionThrows() {
+ final AtomicInteger calls = new AtomicInteger();
+ RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
+ @Override
+ public void handleError(Throwable e) {
+ calls.incrementAndGet();
+ }
+ });
+
+ final AtomicInteger errors = new AtomicInteger();
+ TestSubscriber ts = new TestSubscriber() {
+ @Override
+ public void onError(Throwable e) {
+ errors.incrementAndGet();
+ }
+ };
+ final RuntimeException ex = new RuntimeException();
+ SafeSubscriber safe = new SafeSubscriber(ts);
+ safe.add(Subscriptions.create(new Action0() {
+ @Override
+ public void call() {
+ throw ex;
+ }
+ }));
+
+ try {
+ safe.onCompleted();
+ Assert.fail();
+ } catch(UnsubscribeFailedException e) {
+ assertEquals(1, (int) calls.get());
+ assertEquals(0, (int) errors.get());
+ }
+ }
+
+ @Test
+ public void testPluginErrorHandlerReceivesExceptionFromFailingUnsubscribeAfterCompletionThrows() {
+ final AtomicInteger calls = new AtomicInteger();
+ RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
+ @Override
+ public void handleError(Throwable e) {
+ calls.incrementAndGet();
+ }
+ });
+
+ final AtomicInteger errors = new AtomicInteger();
+ TestSubscriber ts = new TestSubscriber() {
+
+ @Override
+ public void onCompleted() {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ errors.incrementAndGet();
+ }
+ };
+ SafeSubscriber safe = new SafeSubscriber(ts);
+ safe.add(Subscriptions.create(new Action0() {
+ @Override
+ public void call() {
+ throw new RuntimeException();
+ }
+ }));
+
+ try {
+ safe.onCompleted();
+ Assert.fail();
+ } catch(UnsubscribeFailedException e) {
+ assertEquals(2, (int) calls.get());
+ assertEquals(0, (int) errors.get());
+ }
+ }
+
+
}