Skip to content

Commit

Permalink
Fix Take Early Unsubscription Causing Interrupts
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Nov 7, 2014
1 parent 399cbf9 commit af973b8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
16 changes: 8 additions & 8 deletions src/main/java/rx/internal/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ public void onNext(T i) {
if (!isUnsubscribed()) {
if (++count >= limit) {
completed = true;
// unsubscribe before emitting onNext so shutdown happens before possible effects
// of onNext such as product.request(n) calls be sent upstream.
unsubscribe();
}
child.onNext(i);
if (completed) {
child.onCompleted();
unsubscribe();
}
}
}
Expand All @@ -83,11 +81,13 @@ public void setProducer(final Producer producer) {

@Override
public void request(long n) {
long c = limit - count;
if (n < c) {
producer.request(n);
} else {
producer.request(c);
if (!completed) {
long c = limit - count;
if (n < c) {
producer.request(n);
} else {
producer.request(c);
}
}
}
});
Expand Down
31 changes: 27 additions & 4 deletions src/test/java/rx/internal/operators/OperatorTakeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
Expand All @@ -28,9 +26,11 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;
Expand All @@ -43,7 +43,6 @@
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.internal.operators.OperatorTake;
import rx.observers.Subscribers;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -365,4 +364,28 @@ public void request(long n) {
}).take(1).subscribe(ts);
assertEquals(1, requested.get());
}

@Test
public void testInterrupt() throws InterruptedException {
final AtomicReference<Object> exception = new AtomicReference<Object>();
final CountDownLatch latch = new CountDownLatch(1);
Observable.just(1).subscribeOn(Schedulers.computation()).take(1).subscribe(new Action1<Integer>() {

@Override
public void call(Integer t1) {
try {
Thread.sleep(100);
} catch (Exception e) {
exception.set(e);
e.printStackTrace();
} finally {
latch.countDown();
}
}

});

latch.await();
assertNull(exception.get());
}
}

0 comments on commit af973b8

Please sign in to comment.