From 00433f3960bcce6cdb13059418059982a6905a2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 15 Feb 2016 01:19:18 +0100 Subject: [PATCH] 1.x: make Completable.subscribe() report isUnsubscribed consistently --- src/main/java/rx/Completable.java | 8 +- src/test/java/rx/CompletableTest.java | 138 ++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index 2b0940afe1..b71ee03e20 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -1828,12 +1828,13 @@ public final Subscription subscribe() { subscribe(new CompletableSubscriber() { @Override public void onCompleted() { - // nothing to do + mad.unsubscribe(); } @Override public void onError(Throwable e) { ERROR_HANDLER.handleError(e); + mad.unsubscribe(); } @Override @@ -1864,11 +1865,13 @@ public void onCompleted() { } catch (Throwable e) { ERROR_HANDLER.handleError(e); } + mad.unsubscribe(); } @Override public void onError(Throwable e) { ERROR_HANDLER.handleError(e); + mad.unsubscribe(); } @Override @@ -1900,7 +1903,9 @@ public void onCompleted() { onComplete.call(); } catch (Throwable e) { onError(e); + return; } + mad.unsubscribe(); } @Override @@ -1911,6 +1916,7 @@ public void onError(Throwable e) { e = new CompositeException(Arrays.asList(e, ex)); ERROR_HANDLER.handleError(e); } + mad.unsubscribe(); } @Override diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index 6261d18f93..97c169c4f5 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -3604,4 +3604,142 @@ public Completable call(Integer t) { assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException); } + @Test + public void subscribeReportsUnsubscribed() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + Subscription completableSubscription = completable.subscribe(); + + stringSubject.onCompleted(); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + } + + @Test + public void subscribeReportsUnsubscribedOnError() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + Subscription completableSubscription = completable.subscribe(); + + stringSubject.onError(new TestException()); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + } + + @Test + public void subscribeActionReportsUnsubscribed() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + Subscription completableSubscription = completable.subscribe(Actions.empty()); + + stringSubject.onCompleted(); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + } + + @Test + public void subscribeActionReportsUnsubscribedAfter() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + final AtomicReference subscriptionRef = new AtomicReference(); + Subscription completableSubscription = completable.subscribe(new Action0() { + @Override + public void call() { + if (subscriptionRef.get().isUnsubscribed()) { + subscriptionRef.set(null); + } + } + }); + subscriptionRef.set(completableSubscription); + + stringSubject.onCompleted(); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + assertNotNull("Unsubscribed before the call to onCompleted", subscriptionRef.get()); + } + + @Test + public void subscribeActionReportsUnsubscribedOnError() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + Subscription completableSubscription = completable.subscribe(Actions.empty()); + + stringSubject.onError(new TestException()); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + } + + @Test + public void subscribeAction2ReportsUnsubscribed() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + Subscription completableSubscription = completable.subscribe(Actions.empty(), Actions.empty()); + + stringSubject.onCompleted(); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + } + + @Test + public void subscribeAction2ReportsUnsubscribedOnError() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + Subscription completableSubscription = completable.subscribe(Actions.empty(), Actions.empty()); + + stringSubject.onError(new TestException()); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + } + + @Test + public void subscribeAction2ReportsUnsubscribedAfter() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + final AtomicReference subscriptionRef = new AtomicReference(); + Subscription completableSubscription = completable.subscribe(Actions.empty(), new Action0() { + @Override + public void call() { + if (subscriptionRef.get().isUnsubscribed()) { + subscriptionRef.set(null); + } + } + }); + subscriptionRef.set(completableSubscription); + + stringSubject.onCompleted(); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + assertNotNull("Unsubscribed before the call to onCompleted", subscriptionRef.get()); + } + + @Test + public void subscribeAction2ReportsUnsubscribedOnErrorAfter() { + PublishSubject stringSubject = PublishSubject.create(); + Completable completable = stringSubject.toCompletable(); + + final AtomicReference subscriptionRef = new AtomicReference(); + Subscription completableSubscription = completable.subscribe(new Action1() { + @Override + public void call(Throwable e) { + if (subscriptionRef.get().isUnsubscribed()) { + subscriptionRef.set(null); + } + } + }, Actions.empty()); + subscriptionRef.set(completableSubscription); + + stringSubject.onError(new TestException()); + + assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed()); + assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get()); + } + } \ No newline at end of file