Skip to content

Commit

Permalink
OperatorObserveOn should not request more after child is unsubscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed May 16, 2015
1 parent 425a6f4 commit d73b968
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ void pollQueue() {
counter = 1;
long produced = 0;
long r = requested;
while (!child.isUnsubscribed()) {
boolean isUnsubscribed;
while (!(isUnsubscribed = child.isUnsubscribed())) {
Throwable error;
if (finished) {
if ((error = this.error) != null) {
Expand Down Expand Up @@ -214,6 +215,8 @@ void pollQueue() {
break;
}
}
if (isUnsubscribed)
return;
if (produced > 0 && requested != Long.MAX_VALUE) {
REQUESTED.addAndGet(this, -produced);
}
Expand Down
40 changes: 40 additions & 0 deletions src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -765,4 +767,42 @@ public void onNext(Integer t) {

}

@Test
public void testNoMoreRequestsAfterUnsubscribe() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final List<Long> requests = Collections.synchronizedList(new ArrayList<Long>());
Observable.range(1, 1000000)
.doOnRequest(new Action1<Long>() {

@Override
public void call(Long n) {
requests.add(n);
}
})
.observeOn(Schedulers.io())
.subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer t) {
unsubscribe();
latch.countDown();
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(1, requests.size());
}

}

0 comments on commit d73b968

Please sign in to comment.