From 80efeb451951efcaea674082a111909fa7eec2b2 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 25 May 2017 10:29:56 +0200 Subject: [PATCH] 2.x: make sure interval+trampoline can be stopped --- .../operators/flowable/FlowableInterval.java | 13 ++++++-- .../flowable/FlowableIntervalRange.java | 13 ++++++-- .../observable/ObservableInterval.java | 13 ++++++-- .../observable/ObservableIntervalRange.java | 13 ++++++-- .../schedulers/TrampolineScheduler.java | 4 +++ .../flowable/FlowableIntervalRangeTest.java | 8 +++++ .../flowable/FlowableIntervalTest.java | 32 +++++++++++++++++++ .../ObservableIntervalRangeTest.java | 8 +++++ .../observable/ObservableIntervalTest.java | 10 +++++- 9 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalTest.java diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java index 9d1cd21197..ca97aa34e6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java @@ -19,9 +19,11 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.schedulers.TrampolineScheduler; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; @@ -43,9 +45,16 @@ public void subscribeActual(Subscriber s) { IntervalSubscriber is = new IntervalSubscriber(s); s.onSubscribe(is); - Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit); + Scheduler sch = scheduler; - is.setResource(d); + if (sch instanceof TrampolineScheduler) { + Worker worker = sch.createWorker(); + is.setResource(worker); + worker.schedulePeriodically(is, initialDelay, period, unit); + } else { + Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit); + is.setResource(d); + } } static final class IntervalSubscriber extends AtomicLong diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java index 0092ac1364..fa697dba41 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java @@ -19,9 +19,11 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.schedulers.TrampolineScheduler; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; @@ -47,9 +49,16 @@ public void subscribeActual(Subscriber s) { IntervalRangeSubscriber is = new IntervalRangeSubscriber(s, start, end); s.onSubscribe(is); - Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit); + Scheduler sch = scheduler; - is.setResource(d); + if (sch instanceof TrampolineScheduler) { + Worker worker = sch.createWorker(); + is.setResource(worker); + worker.schedulePeriodically(is, initialDelay, period, unit); + } else { + Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit); + is.setResource(d); + } } static final class IntervalRangeSubscriber extends AtomicLong diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInterval.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInterval.java index 49fc1cb52d..7607b62458 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInterval.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInterval.java @@ -17,8 +17,10 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; +import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.schedulers.TrampolineScheduler; public final class ObservableInterval extends Observable { final Scheduler scheduler; @@ -38,9 +40,16 @@ public void subscribeActual(Observer s) { IntervalObserver is = new IntervalObserver(s); s.onSubscribe(is); - Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit); + Scheduler sch = scheduler; - is.setResource(d); + if (sch instanceof TrampolineScheduler) { + Worker worker = sch.createWorker(); + is.setResource(worker); + worker.schedulePeriodically(is, initialDelay, period, unit); + } else { + Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit); + is.setResource(d); + } } static final class IntervalObserver diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableIntervalRange.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableIntervalRange.java index c65da46be4..3fd5396bfd 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableIntervalRange.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableIntervalRange.java @@ -17,8 +17,10 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; +import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.schedulers.TrampolineScheduler; public final class ObservableIntervalRange extends Observable { final Scheduler scheduler; @@ -42,9 +44,16 @@ public void subscribeActual(Observer s) { IntervalRangeObserver is = new IntervalRangeObserver(s, start, end); s.onSubscribe(is); - Disposable d = scheduler.schedulePeriodicallyDirect(is, initialDelay, period, unit); + Scheduler sch = scheduler; - is.setResource(d); + if (sch instanceof TrampolineScheduler) { + Worker worker = sch.createWorker(); + is.setResource(worker); + worker.schedulePeriodically(is, initialDelay, period, unit); + } else { + Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit); + is.setResource(d); + } } static final class IntervalRangeObserver diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index 25f164081e..2f9a595f6d 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -100,6 +100,10 @@ Disposable enqueue(Runnable action, long execTime) { int missed = 1; for (;;) { for (;;) { + if (disposed) { + queue.clear(); + return EmptyDisposable.INSTANCE; + } final TimedRunnable polled = queue.poll(); if (polled == null) { break; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java index e0bb3613ef..881acad749 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalRangeTest.java @@ -109,4 +109,12 @@ public void take() { .awaitDone(5, TimeUnit.SECONDS) .assertResult(1L); } + + @Test(timeout = 2000) + public void cancel() { + Flowable.intervalRange(0, 20, 1, 1, TimeUnit.MILLISECONDS, Schedulers.trampoline()) + .take(10) + .test() + .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalTest.java new file mode 100644 index 0000000000..93824d331b --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIntervalTest.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.Flowable; +import io.reactivex.schedulers.Schedulers; + +public class FlowableIntervalTest { + + @Test(timeout = 2000) + public void cancel() { + Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline()) + .take(10) + .test() + .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalRangeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalRangeTest.java index 4a70f1027a..e7af11c54c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalRangeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalRangeTest.java @@ -76,4 +76,12 @@ public void longOverflow() { public void dispose() { TestHelper.checkDisposed(Observable.intervalRange(1, 2, 1, 1, TimeUnit.MILLISECONDS)); } + + @Test(timeout = 2000) + public void cancel() { + Observable.intervalRange(0, 20, 1, 1, TimeUnit.MILLISECONDS, Schedulers.trampoline()) + .take(10) + .test() + .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java index 0a164c0c47..ff8a2e149a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java @@ -18,7 +18,7 @@ import org.junit.Test; import io.reactivex.*; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; public class ObservableIntervalTest { @@ -26,4 +26,12 @@ public class ObservableIntervalTest { public void dispose() { TestHelper.checkDisposed(Observable.interval(1, TimeUnit.MILLISECONDS, new TestScheduler())); } + + @Test(timeout = 2000) + public void cancel() { + Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline()) + .take(10) + .test() + .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + } }