From 983b6b0cd8f671a181dbcd54d6cd68dd2981bb6c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 29 Dec 2016 19:29:17 +0100 Subject: [PATCH] 2.x: fix timeout with fallback not cancelling the main source --- .../flowable/FlowablePublishMulticast.java | 8 ++-- .../flowable/FlowableTimeoutTimed.java | 1 + .../observable/ObservableTimeoutTimed.java | 1 + .../flowable/FlowablePublishFunctionTest.java | 6 +-- .../flowable/FlowableTimeoutTests.java | 35 +++++++++++++++++ .../FlowableTimeoutWithSelectorTest.java | 38 ++++++++++++++++++- .../observable/ObservableTimeoutTests.java | 34 +++++++++++++++++ .../ObservableTimeoutWithSelectorTest.java | 38 ++++++++++++++++++- 8 files changed, 152 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java index 928c0ffbd7..73b5aa44b6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java @@ -137,7 +137,7 @@ static final class MulticastProcessor extends Flowable implements Subscrib final AtomicReference[]> subscribers; final int prefetch; - + final int limit; final boolean delayError; @@ -150,7 +150,7 @@ static final class MulticastProcessor extends Flowable implements Subscrib volatile boolean done; Throwable error; - + int consumed; @SuppressWarnings("unchecked") @@ -319,11 +319,11 @@ void drain() { int missed = 1; SimpleQueue q = queue; - + int upstreamConsumed = consumed; int localLimit = limit; boolean canRequest = sourceMode != QueueSubscription.SYNC; - + for (;;) { MulticastSubscription[] array = subscribers.get(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index e0738b951b..b074e10179 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -174,6 +174,7 @@ public void onComplete() { public void dispose() { worker.dispose(); DisposableHelper.dispose(timer); + s.cancel(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java index e5a4b6b5f6..0c62048555 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java @@ -174,6 +174,7 @@ public void onComplete() { public void dispose() { worker.dispose(); DisposableHelper.dispose(this); + s.dispose(); } @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java index b404f3ddc8..eef409d31e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java @@ -473,7 +473,7 @@ public Publisher apply(Flowable v) throws Exception { public boolean test(Integer w) throws Exception { return w % 2 == 0; } - }), + }), v.filter(new Predicate() { @Override public boolean test(Integer w) throws Exception { @@ -500,7 +500,7 @@ public Publisher apply(Flowable v) throws Exception { public boolean test(Integer w) throws Exception { return w % 2 == 0; } - }), + }), v.filter(new Predicate() { @Override public boolean test(Integer w) throws Exception { @@ -528,7 +528,7 @@ public Publisher apply(Flowable v) throws Exception { public boolean test(Integer w) throws Exception { return w % 2 == 0; } - }), + }), v.filter(new Predicate() { @Override public boolean test(Integer w) throws Exception { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java index 64dc5c912b..f51d1669e3 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java @@ -483,4 +483,39 @@ protected void subscribeActual(Subscriber observer) { } } + + @Test + public void timedTake() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps.timeout(1, TimeUnit.DAYS) + .take(1) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + assertFalse(ps.hasSubscribers()); + + to.assertResult(1); + } + + @Test + public void timedFallbackTake() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps.timeout(1, TimeUnit.DAYS, Flowable.just(2)) + .take(1) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + assertFalse(ps.hasSubscribers()); + + to.assertResult(1); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java index f08b438647..f0f3772dc8 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.flowable; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -509,4 +509,40 @@ protected void subscribeActual(Subscriber observer) { .test() .assertResult(1); } + + @Test + public void selectorTake() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps + .timeout(Functions.justFunction(Flowable.never())) + .take(1) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + assertFalse(ps.hasSubscribers()); + + to.assertResult(1); + } + + @Test + public void selectorFallbackTake() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps + .timeout(Functions.justFunction(Flowable.never()), Flowable.just(2)) + .take(1) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + assertFalse(ps.hasSubscribers()); + + to.assertResult(1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java index 20fd73d63b..03b2a45e47 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java @@ -481,4 +481,38 @@ protected void subscribeActual(Observer observer) { RxJavaPlugins.reset(); } } + + @Test + public void timedTake() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.timeout(1, TimeUnit.DAYS) + .take(1) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + assertFalse(ps.hasObservers()); + + to.assertResult(1); + } + + @Test + public void timedFallbackTake() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.timeout(1, TimeUnit.DAYS, Observable.just(2)) + .take(1) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + assertFalse(ps.hasObservers()); + + to.assertResult(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java index 90bbeb2ace..c99c1cf0d3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -510,4 +510,40 @@ protected void subscribeActual(Observer observer) { .test() .assertResult(1); } + + @Test + public void selectorTake() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .timeout(Functions.justFunction(Observable.never())) + .take(1) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + assertFalse(ps.hasObservers()); + + to.assertResult(1); + } + + @Test + public void selectorFallbackTake() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .timeout(Functions.justFunction(Observable.never()), Observable.just(2)) + .take(1) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + assertFalse(ps.hasObservers()); + + to.assertResult(1); + } }