Skip to content

Commit

Permalink
1.x: make Completable.subscribe() report isUnsubscribed consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 15, 2016
1 parent ee9956a commit 00433f3
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1828,12 +1828,13 @@ public final Subscription subscribe() {
subscribe(new CompletableSubscriber() {
@Override
public void onCompleted() {
// nothing to do
mad.unsubscribe();
}

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
}

@Override
Expand Down Expand Up @@ -1864,11 +1865,13 @@ public void onCompleted() {
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
}
mad.unsubscribe();
}

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
}

@Override
Expand Down Expand Up @@ -1900,7 +1903,9 @@ public void onCompleted() {
onComplete.call();
} catch (Throwable e) {
onError(e);
return;
}
mad.unsubscribe();
}

@Override
Expand All @@ -1911,6 +1916,7 @@ public void onError(Throwable e) {
e = new CompositeException(Arrays.asList(e, ex));
ERROR_HANDLER.handleError(e);
}
mad.unsubscribe();
}

@Override
Expand Down
138 changes: 138 additions & 0 deletions src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3604,4 +3604,142 @@ public Completable call(Integer t) {
assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException);
}

@Test
public void subscribeReportsUnsubscribed() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

Subscription completableSubscription = completable.subscribe();

stringSubject.onCompleted();

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
}

@Test
public void subscribeReportsUnsubscribedOnError() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

Subscription completableSubscription = completable.subscribe();

stringSubject.onError(new TestException());

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
}

@Test
public void subscribeActionReportsUnsubscribed() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

Subscription completableSubscription = completable.subscribe(Actions.empty());

stringSubject.onCompleted();

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
}

@Test
public void subscribeActionReportsUnsubscribedAfter() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

final AtomicReference<Subscription> subscriptionRef = new AtomicReference<Subscription>();
Subscription completableSubscription = completable.subscribe(new Action0() {
@Override
public void call() {
if (subscriptionRef.get().isUnsubscribed()) {
subscriptionRef.set(null);
}
}
});
subscriptionRef.set(completableSubscription);

stringSubject.onCompleted();

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
assertNotNull("Unsubscribed before the call to onCompleted", subscriptionRef.get());
}

@Test
public void subscribeActionReportsUnsubscribedOnError() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

Subscription completableSubscription = completable.subscribe(Actions.empty());

stringSubject.onError(new TestException());

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
}

@Test
public void subscribeAction2ReportsUnsubscribed() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

Subscription completableSubscription = completable.subscribe(Actions.empty(), Actions.empty());

stringSubject.onCompleted();

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
}

@Test
public void subscribeAction2ReportsUnsubscribedOnError() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

Subscription completableSubscription = completable.subscribe(Actions.empty(), Actions.empty());

stringSubject.onError(new TestException());

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
}

@Test
public void subscribeAction2ReportsUnsubscribedAfter() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

final AtomicReference<Subscription> subscriptionRef = new AtomicReference<Subscription>();
Subscription completableSubscription = completable.subscribe(Actions.empty(), new Action0() {
@Override
public void call() {
if (subscriptionRef.get().isUnsubscribed()) {
subscriptionRef.set(null);
}
}
});
subscriptionRef.set(completableSubscription);

stringSubject.onCompleted();

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
assertNotNull("Unsubscribed before the call to onCompleted", subscriptionRef.get());
}

@Test
public void subscribeAction2ReportsUnsubscribedOnErrorAfter() {
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();

final AtomicReference<Subscription> subscriptionRef = new AtomicReference<Subscription>();
Subscription completableSubscription = completable.subscribe(new Action1<Throwable>() {
@Override
public void call(Throwable e) {
if (subscriptionRef.get().isUnsubscribed()) {
subscriptionRef.set(null);
}
}
}, Actions.empty());
subscriptionRef.set(completableSubscription);

stringSubject.onError(new TestException());

assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
}

}

0 comments on commit 00433f3

Please sign in to comment.