diff --git a/src/main/java/rx/subjects/AsyncSubject.java b/src/main/java/rx/subjects/AsyncSubject.java index 74fe558017..26fcc42dc4 100644 --- a/src/main/java/rx/subjects/AsyncSubject.java +++ b/src/main/java/rx/subjects/AsyncSubject.java @@ -15,7 +15,12 @@ */ package rx.subjects; +import java.util.ArrayList; +import java.util.List; + import rx.Observer; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.internal.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -104,8 +109,24 @@ public void onCompleted() { public void onError(final Throwable e) { if (state.active) { Object n = nl.error(e); + List<Throwable> errors = null; for (SubjectObserver<T> bo : state.terminate(n)) { - bo.onError(e); + try { + bo.onError(e); + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList<Throwable>(); + } + errors.add(e2); + } + } + + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting AsyncSubject.onError", errors); + } } } } diff --git a/src/main/java/rx/subjects/BehaviorSubject.java b/src/main/java/rx/subjects/BehaviorSubject.java index 6f1da66ba7..cc3650949a 100644 --- a/src/main/java/rx/subjects/BehaviorSubject.java +++ b/src/main/java/rx/subjects/BehaviorSubject.java @@ -16,7 +16,12 @@ package rx.subjects; +import java.util.ArrayList; +import java.util.List; + import rx.Observer; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.internal.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -131,8 +136,24 @@ public void onError(Throwable e) { Object last = state.get(); if (last == null || state.active) { Object n = nl.error(e); + List<Throwable> errors = null; for (SubjectObserver<T> bo : state.terminate(n)) { - bo.emitNext(n, state.nl); + try { + bo.emitNext(n, state.nl); + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList<Throwable>(); + } + errors.add(e2); + } + } + + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting AsyncSubject.onError", errors); + } } } } diff --git a/src/main/java/rx/subjects/PublishSubject.java b/src/main/java/rx/subjects/PublishSubject.java index 11689f894d..512c410f80 100644 --- a/src/main/java/rx/subjects/PublishSubject.java +++ b/src/main/java/rx/subjects/PublishSubject.java @@ -15,7 +15,12 @@ */ package rx.subjects; +import java.util.ArrayList; +import java.util.List; + import rx.Observer; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.internal.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -89,8 +94,23 @@ public void onCompleted() { public void onError(final Throwable e) { if (state.active) { Object n = nl.error(e); + List<Throwable> errors = null; for (SubjectObserver<T> bo : state.terminate(n)) { - bo.emitNext(n, state.nl); + try { + bo.emitNext(n, state.nl); + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList<Throwable>(); + } + errors.add(e2); + } + } + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting PublishSubject.onError", errors); + } } } } diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index 48b59f2a21..819023a9cb 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -16,11 +16,14 @@ package rx.subjects; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observer; import rx.Scheduler; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Functions; @@ -303,9 +306,25 @@ public void onNext(T t) { public void onError(final Throwable e) { if (ssm.active) { state.error(e); + List<Throwable> errors = null; for (SubjectObserver<? super T> o : ssm.terminate(NotificationLite.instance().error(e))) { - if (caughtUp(o)) { - o.onError(e); + try { + if (caughtUp(o)) { + o.onError(e); + } + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList<Throwable>(); + } + errors.add(e2); + } + } + + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting ReplaySubject.onError", errors); } } } diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index a76298a801..33b1e9da02 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -48,7 +48,6 @@ import rx.Observable.OnSubscribe; import rx.Observable.Transformer; import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Action2; import rx.functions.Func1; @@ -56,6 +55,8 @@ import rx.observables.ConnectableObservable; import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; +import rx.subjects.ReplaySubject; +import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; public class ObservableTests { @@ -1125,5 +1126,21 @@ public String call(Integer t1) { ts.assertNoErrors(); ts.assertReceivedOnNext(Arrays.asList("1", "2", "3")); } + + @Test + public void testErrorThrownIssue1685() { + Subject<Object, Object> subject = ReplaySubject.create(); + + Observable.error(new RuntimeException("oops")) + .materialize() + .delay(1, TimeUnit.SECONDS) + .dematerialize() + .subscribe(subject); + + subject.subscribe(); + subject.materialize().toBlocking().first(); + + System.out.println("Done"); + } } diff --git a/src/test/java/rx/subjects/AsyncSubjectTest.java b/src/test/java/rx/subjects/AsyncSubjectTest.java index 1d7c189cae..433531ab6c 100644 --- a/src/test/java/rx/subjects/AsyncSubjectTest.java +++ b/src/test/java/rx/subjects/AsyncSubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.inOrder; @@ -33,7 +34,10 @@ import rx.Observer; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; +import rx.observers.TestSubscriber; public class AsyncSubjectTest { @@ -281,5 +285,49 @@ public void run() { } } } + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + AsyncSubject<String> ps = AsyncSubject.create(); + + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + AsyncSubject<String> ps = AsyncSubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } } diff --git a/src/test/java/rx/subjects/BehaviorSubjectTest.java b/src/test/java/rx/subjects/BehaviorSubjectTest.java index dc78ebf42a..ec02c226e4 100644 --- a/src/test/java/rx/subjects/BehaviorSubjectTest.java +++ b/src/test/java/rx/subjects/BehaviorSubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -30,7 +31,10 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Func1; +import rx.observers.TestSubscriber; public class BehaviorSubjectTest { @@ -367,4 +371,48 @@ public void testTakeOneSubscriber() { assertEquals(0, source.subscriberCount()); } + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + BehaviorSubject<String> ps = BehaviorSubject.create(); + + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + BehaviorSubject<String> ps = BehaviorSubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } } diff --git a/src/test/java/rx/subjects/PublishSubjectTest.java b/src/test/java/rx/subjects/PublishSubjectTest.java index 20ec300700..2c01d18f54 100644 --- a/src/test/java/rx/subjects/PublishSubjectTest.java +++ b/src/test/java/rx/subjects/PublishSubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -33,8 +34,11 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.functions.Func1; +import rx.observers.TestSubscriber; public class PublishSubjectTest { @@ -346,4 +350,49 @@ public void onCompleted() { verify(o, never()).onError(any(Throwable.class)); } } + + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + PublishSubject<String> ps = PublishSubject.create(); + + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + PublishSubject<String> ps = PublishSubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } } diff --git a/src/test/java/rx/subjects/ReplaySubjectTest.java b/src/test/java/rx/subjects/ReplaySubjectTest.java index 7c4ae2f8f9..d6ff19ab9c 100644 --- a/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -36,7 +37,10 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Func1; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; @@ -589,4 +593,48 @@ public void testReplayTimestampedDirectly() { verify(o).onNext(3); verify(o).onCompleted(); } + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + ReplaySubject<String> ps = ReplaySubject.create(); + + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + ReplaySubject<String> ps = ReplaySubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber<String> ts = new TestSubscriber<String>(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } }