diff --git a/src/main/java/rx/internal/operators/OperatorTake.java b/src/main/java/rx/internal/operators/OperatorTake.java index b2c62e3920..a53b293a9f 100644 --- a/src/main/java/rx/internal/operators/OperatorTake.java +++ b/src/main/java/rx/internal/operators/OperatorTake.java @@ -63,13 +63,11 @@ public void onNext(T i) { if (!isUnsubscribed()) { if (++count >= limit) { completed = true; - // unsubscribe before emitting onNext so shutdown happens before possible effects - // of onNext such as product.request(n) calls be sent upstream. - unsubscribe(); } child.onNext(i); if (completed) { child.onCompleted(); + unsubscribe(); } } } @@ -83,11 +81,13 @@ public void setProducer(final Producer producer) { @Override public void request(long n) { - long c = limit - count; - if (n < c) { - producer.request(n); - } else { - producer.request(c); + if (!completed) { + long c = limit - count; + if (n < c) { + producer.request(n); + } else { + producer.request(c); + } } } }); diff --git a/src/test/java/rx/internal/operators/OperatorTakeTest.java b/src/test/java/rx/internal/operators/OperatorTakeTest.java index 2aa92fb1b9..6590e73c71 100644 --- a/src/test/java/rx/internal/operators/OperatorTakeTest.java +++ b/src/test/java/rx/internal/operators/OperatorTakeTest.java @@ -15,9 +15,7 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.inOrder; @@ -28,9 +26,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; @@ -43,7 +43,6 @@ import rx.Subscription; import rx.functions.Action1; import rx.functions.Func1; -import rx.internal.operators.OperatorTake; import rx.observers.Subscribers; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -365,4 +364,28 @@ public void request(long n) { }).take(1).subscribe(ts); assertEquals(1, requested.get()); } + + @Test + public void testInterrupt() throws InterruptedException { + final AtomicReference exception = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + Observable.just(1).subscribeOn(Schedulers.computation()).take(1).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + try { + Thread.sleep(100); + } catch (Exception e) { + exception.set(e); + e.printStackTrace(); + } finally { + latch.countDown(); + } + } + + }); + + latch.await(); + assertNull(exception.get()); + } }