From d66df7f5c1e3625ff33ea835113c9a55e831cfb8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 9 Oct 2014 21:29:23 -0700 Subject: [PATCH] Subject Error Handling Fixes https://github.com/ReactiveX/RxJava/issues/1685 by delaying errors that are caught until after all subscribers have a chance to receive the event. Note that this has a lot of code duplication to handle this across the Subject implementations. It may be worth abstracting this logic ... but right now I'm just doing what makes sense to fix this as the Subject abstractions are non-trivial. --- src/main/java/rx/subjects/AsyncSubject.java | 23 ++++++++- .../java/rx/subjects/BehaviorSubject.java | 23 ++++++++- src/main/java/rx/subjects/PublishSubject.java | 22 ++++++++- src/main/java/rx/subjects/ReplaySubject.java | 23 ++++++++- src/test/java/rx/ObservableTests.java | 19 ++++++- .../java/rx/subjects/AsyncSubjectTest.java | 48 ++++++++++++++++++ .../java/rx/subjects/BehaviorSubjectTest.java | 48 ++++++++++++++++++ .../java/rx/subjects/PublishSubjectTest.java | 49 +++++++++++++++++++ .../java/rx/subjects/ReplaySubjectTest.java | 48 ++++++++++++++++++ 9 files changed, 297 insertions(+), 6 deletions(-) diff --git a/src/main/java/rx/subjects/AsyncSubject.java b/src/main/java/rx/subjects/AsyncSubject.java index 74fe558017..26fcc42dc4 100644 --- a/src/main/java/rx/subjects/AsyncSubject.java +++ b/src/main/java/rx/subjects/AsyncSubject.java @@ -15,7 +15,12 @@ */ package rx.subjects; +import java.util.ArrayList; +import java.util.List; + import rx.Observer; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.internal.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -104,8 +109,24 @@ public void onCompleted() { public void onError(final Throwable e) { if (state.active) { Object n = nl.error(e); + List errors = null; for (SubjectObserver bo : state.terminate(n)) { - bo.onError(e); + try { + bo.onError(e); + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList(); + } + errors.add(e2); + } + } + + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting AsyncSubject.onError", errors); + } } } } diff --git a/src/main/java/rx/subjects/BehaviorSubject.java b/src/main/java/rx/subjects/BehaviorSubject.java index 6f1da66ba7..cc3650949a 100644 --- a/src/main/java/rx/subjects/BehaviorSubject.java +++ b/src/main/java/rx/subjects/BehaviorSubject.java @@ -16,7 +16,12 @@ package rx.subjects; +import java.util.ArrayList; +import java.util.List; + import rx.Observer; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.internal.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -131,8 +136,24 @@ public void onError(Throwable e) { Object last = state.get(); if (last == null || state.active) { Object n = nl.error(e); + List errors = null; for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n, state.nl); + try { + bo.emitNext(n, state.nl); + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList(); + } + errors.add(e2); + } + } + + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting AsyncSubject.onError", errors); + } } } } diff --git a/src/main/java/rx/subjects/PublishSubject.java b/src/main/java/rx/subjects/PublishSubject.java index 11689f894d..512c410f80 100644 --- a/src/main/java/rx/subjects/PublishSubject.java +++ b/src/main/java/rx/subjects/PublishSubject.java @@ -15,7 +15,12 @@ */ package rx.subjects; +import java.util.ArrayList; +import java.util.List; + import rx.Observer; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.internal.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -89,8 +94,23 @@ public void onCompleted() { public void onError(final Throwable e) { if (state.active) { Object n = nl.error(e); + List errors = null; for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n, state.nl); + try { + bo.emitNext(n, state.nl); + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList(); + } + errors.add(e2); + } + } + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting PublishSubject.onError", errors); + } } } } diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index 48b59f2a21..819023a9cb 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -16,11 +16,14 @@ package rx.subjects; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observer; import rx.Scheduler; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Functions; @@ -303,9 +306,25 @@ public void onNext(T t) { public void onError(final Throwable e) { if (ssm.active) { state.error(e); + List errors = null; for (SubjectObserver o : ssm.terminate(NotificationLite.instance().error(e))) { - if (caughtUp(o)) { - o.onError(e); + try { + if (caughtUp(o)) { + o.onError(e); + } + } catch (Throwable e2) { + if (errors == null) { + errors = new ArrayList(); + } + errors.add(e2); + } + } + + if (errors != null) { + if (errors.size() == 1) { + Exceptions.propagate(errors.get(0)); + } else { + throw new CompositeException("Errors while emitting ReplaySubject.onError", errors); } } } diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index a76298a801..33b1e9da02 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -48,7 +48,6 @@ import rx.Observable.OnSubscribe; import rx.Observable.Transformer; import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Action2; import rx.functions.Func1; @@ -56,6 +55,8 @@ import rx.observables.ConnectableObservable; import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; +import rx.subjects.ReplaySubject; +import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; public class ObservableTests { @@ -1125,5 +1126,21 @@ public String call(Integer t1) { ts.assertNoErrors(); ts.assertReceivedOnNext(Arrays.asList("1", "2", "3")); } + + @Test + public void testErrorThrownIssue1685() { + Subject subject = ReplaySubject.create(); + + Observable.error(new RuntimeException("oops")) + .materialize() + .delay(1, TimeUnit.SECONDS) + .dematerialize() + .subscribe(subject); + + subject.subscribe(); + subject.materialize().toBlocking().first(); + + System.out.println("Done"); + } } diff --git a/src/test/java/rx/subjects/AsyncSubjectTest.java b/src/test/java/rx/subjects/AsyncSubjectTest.java index 1d7c189cae..433531ab6c 100644 --- a/src/test/java/rx/subjects/AsyncSubjectTest.java +++ b/src/test/java/rx/subjects/AsyncSubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.inOrder; @@ -33,7 +34,10 @@ import rx.Observer; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; +import rx.observers.TestSubscriber; public class AsyncSubjectTest { @@ -281,5 +285,49 @@ public void run() { } } } + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + AsyncSubject ps = AsyncSubject.create(); + + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + AsyncSubject ps = AsyncSubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } } diff --git a/src/test/java/rx/subjects/BehaviorSubjectTest.java b/src/test/java/rx/subjects/BehaviorSubjectTest.java index dc78ebf42a..ec02c226e4 100644 --- a/src/test/java/rx/subjects/BehaviorSubjectTest.java +++ b/src/test/java/rx/subjects/BehaviorSubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -30,7 +31,10 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Func1; +import rx.observers.TestSubscriber; public class BehaviorSubjectTest { @@ -367,4 +371,48 @@ public void testTakeOneSubscriber() { assertEquals(0, source.subscriberCount()); } + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + BehaviorSubject ps = BehaviorSubject.create(); + + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + BehaviorSubject ps = BehaviorSubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } } diff --git a/src/test/java/rx/subjects/PublishSubjectTest.java b/src/test/java/rx/subjects/PublishSubjectTest.java index 20ec300700..2c01d18f54 100644 --- a/src/test/java/rx/subjects/PublishSubjectTest.java +++ b/src/test/java/rx/subjects/PublishSubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -33,8 +34,11 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.functions.Func1; +import rx.observers.TestSubscriber; public class PublishSubjectTest { @@ -346,4 +350,49 @@ public void onCompleted() { verify(o, never()).onError(any(Throwable.class)); } } + + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + PublishSubject ps = PublishSubject.create(); + + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + PublishSubject ps = PublishSubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } } diff --git a/src/test/java/rx/subjects/ReplaySubjectTest.java b/src/test/java/rx/subjects/ReplaySubjectTest.java index 7c4ae2f8f9..d6ff19ab9c 100644 --- a/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -16,6 +16,7 @@ package rx.subjects; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -36,7 +37,10 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Func1; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; @@ -589,4 +593,48 @@ public void testReplayTimestampedDirectly() { verify(o).onNext(3); verify(o).onCompleted(); } + + @Test + public void testOnErrorThrowsDoesntPreventDelivery() { + ReplaySubject ps = ReplaySubject.create(); + + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (OnErrorNotImplementedException e) { + // ignore + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } + + /** + * This one has multiple failures so should get a CompositeException + */ + @Test + public void testOnErrorThrowsDoesntPreventDelivery2() { + ReplaySubject ps = ReplaySubject.create(); + + ps.subscribe(); + ps.subscribe(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribe(ts); + ps.subscribe(); + ps.subscribe(); + ps.subscribe(); + + try { + ps.onError(new RuntimeException("an exception")); + fail("expect OnErrorNotImplementedException"); + } catch (CompositeException e) { + // we should have 5 of them + assertEquals(5, e.getExceptions().size()); + } + // even though the onError above throws we should still receive it on the other subscriber + assertEquals(1, ts.getOnErrorEvents().size()); + } }