diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index fcce530803..51b82c9a0e 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -15,13 +15,12 @@ import java.util.concurrent.TimeUnit; -import io.reactivex.annotations.Experimental; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.*; -import io.reactivex.internal.schedulers.SchedulerWhen; +import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -131,9 +130,11 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull Tim final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); - w.schedule(new DisposeTask(decoratedRun, w), delay, unit); + DisposeTask task = new DisposeTask(decoratedRun, w); - return w; + w.schedule(task, delay, unit); + + return task; } /** @@ -432,10 +433,12 @@ public boolean isDisposed() { } } - static final class DisposeTask implements Runnable { + static final class DisposeTask implements Runnable, Disposable { final Runnable decoratedRun; final Worker w; + Thread runner; + DisposeTask(Runnable decoratedRun, Worker w) { this.decoratedRun = decoratedRun; this.w = w; @@ -443,11 +446,27 @@ static final class DisposeTask implements Runnable { @Override public void run() { + runner = Thread.currentThread(); try { decoratedRun.run(); } finally { + dispose(); + runner = null; + } + } + + @Override + public void dispose() { + if (runner == Thread.currentThread() && w instanceof NewThreadWorker) { + ((NewThreadWorker)w).shutdown(); + } else { w.dispose(); } } + + @Override + public boolean isDisposed() { + return w.isDisposed(); + } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java b/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java new file mode 100644 index 0000000000..ef6c57caa7 --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java @@ -0,0 +1,80 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.functions.Functions; + +/** + * Base functionality for direct tasks that manage a runnable and cancellation/completion. + * @since 2.0.8 + */ +abstract class AbstractDirectTask +extends AtomicReference> +implements Disposable { + + private static final long serialVersionUID = 1811839108042568751L; + + protected final Runnable runnable; + + protected Thread runner; + + protected static final FutureTask FINISHED = new FutureTask(Functions.EMPTY_RUNNABLE, null); + + protected static final FutureTask DISPOSED = new FutureTask(Functions.EMPTY_RUNNABLE, null); + + public AbstractDirectTask(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public final void dispose() { + Future f = get(); + if (f != FINISHED && f != DISPOSED) { + if (compareAndSet(f, DISPOSED)) { + if (f != null) { + f.cancel(runner != Thread.currentThread()); + } + } + } + } + + @Override + public final boolean isDisposed() { + Future f = get(); + return f == FINISHED || f == DISPOSED; + } + + public final void setFuture(Future future) { + for (;;) { + Future f = get(); + if (f == FINISHED) { + break; + } + if (f == DISPOSED) { + future.cancel(runner != Thread.currentThread()); + break; + } + if (compareAndSet(f, future)) { + break; + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index dd27bbd778..e050fed1f7 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -51,8 +51,10 @@ public Disposable scheduleDirect(@NonNull Runnable run) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { if (executor instanceof ExecutorService) { - Future f = ((ExecutorService)executor).submit(decoratedRun); - return Disposables.fromFuture(f); + ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun); + Future f = ((ExecutorService)executor).submit(task); + task.setFuture(f); + return task; } BooleanRunnable br = new BooleanRunnable(decoratedRun); @@ -70,8 +72,10 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); if (executor instanceof ScheduledExecutorService) { try { - Future f = ((ScheduledExecutorService)executor).schedule(decoratedRun, delay, unit); - return Disposables.fromFuture(f); + ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun); + Future f = ((ScheduledExecutorService)executor).schedule(task, delay, unit); + task.setFuture(f); + return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; @@ -93,8 +97,10 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial if (executor instanceof ScheduledExecutorService) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { - Future f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(decoratedRun, initialDelay, period, unit); - return Disposables.fromFuture(f); + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun); + Future f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit); + task.setFuture(f); + return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index 12f121135b..93b4a8c8f4 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -60,15 +60,16 @@ public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonN * @return the ScheduledRunnable instance */ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { - Runnable decoratedRun = RxJavaPlugins.onSchedule(run); + ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); try { Future f; - if (delayTime <= 0) { - f = executor.submit(decoratedRun); + if (delayTime <= 0L) { + f = executor.submit(task); } else { - f = executor.schedule(decoratedRun, delayTime, unit); + f = executor.schedule(task, delayTime, unit); } - return Disposables.fromFuture(f); + task.setFuture(f); + return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; @@ -85,10 +86,11 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un * @return the ScheduledRunnable instance */ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) { - Runnable decoratedRun = RxJavaPlugins.onSchedule(run); + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run)); try { - Future f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit); - return Disposables.fromFuture(f); + Future f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); + task.setFuture(f); + return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; @@ -145,6 +147,16 @@ public void dispose() { } } + /** + * Shuts down the underlying executor in a non-interrupting fashion. + */ + public void shutdown() { + if (!disposed) { + disposed = true; + executor.shutdown(); + } + } + @Override public boolean isDisposed() { return disposed; diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java new file mode 100644 index 0000000000..080928f722 --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import io.reactivex.plugins.RxJavaPlugins; + +/** + * A Callable to be submitted to an ExecutorService that runs a Runnable + * action periodically and manages completion/cancellation. + * @since 2.0.8 + */ +public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implements Runnable { + + private static final long serialVersionUID = 1811839108042568751L; + + public ScheduledDirectPeriodicTask(Runnable runnable) { + super(runnable); + } + + @Override + public void run() { + runner = Thread.currentThread(); + try { + try { + runnable.run(); + } catch (Throwable ex) { + lazySet(FINISHED); + RxJavaPlugins.onError(ex); + } + } finally { + runner = null; + } + } +} diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectTask.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectTask.java new file mode 100644 index 0000000000..44d4ce52e0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectTask.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import java.util.concurrent.Callable; + +/** + * A Callable to be submitted to an ExecutorService that runs a Runnable + * action and manages completion/cancellation. + * @since 2.0.8 + */ +public final class ScheduledDirectTask extends AbstractDirectTask implements Callable { + + private static final long serialVersionUID = 1811839108042568751L; + + public ScheduledDirectTask(Runnable runnable) { + super(runnable); + } + + @Override + public Void call() throws Exception { + runner = Thread.currentThread(); + try { + runnable.run(); + } finally { + lazySet(FINISHED); + runner = null; + } + return null; + } +} diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java index b7fd9a0bff..c942deacfa 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java @@ -32,6 +32,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray static final int PARENT_INDEX = 0; static final int FUTURE_INDEX = 1; + static final int THREAD_INDEX = 2; /** * Creates a ScheduledRunnable by wrapping the given action and setting @@ -40,7 +41,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray * @param parent the parent tracking container or null if none */ public ScheduledRunnable(Runnable actual, DisposableContainer parent) { - super(2); + super(3); this.actual = actual; this.lazySet(0, parent); } @@ -54,6 +55,7 @@ public Object call() { @Override public void run() { + lazySet(THREAD_INDEX, Thread.currentThread()); try { try { actual.run(); @@ -62,6 +64,7 @@ public void run() { RxJavaPlugins.onError(e); } } finally { + lazySet(THREAD_INDEX, null); Object o = get(PARENT_INDEX); if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) { ((DisposableContainer)o).delete(this); @@ -83,7 +86,7 @@ public void setFuture(Future f) { return; } if (o == DISPOSED) { - f.cancel(true); + f.cancel(get(THREAD_INDEX) != Thread.currentThread()); return; } if (compareAndSet(FUTURE_INDEX, o, f)) { @@ -101,7 +104,7 @@ public void dispose() { } if (compareAndSet(FUTURE_INDEX, o, DISPOSED)) { if (o != null) { - ((Future)o).cancel(true); + ((Future)o).cancel(get(THREAD_INDEX) != Thread.currentThread()); } break; } diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index 47e7761f89..11b986204b 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -106,15 +106,16 @@ public Worker createWorker() { @NonNull @Override public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { - Runnable decoratedRun = RxJavaPlugins.onSchedule(run); + ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); try { Future f; if (delay <= 0L) { - f = executor.get().submit(decoratedRun); + f = executor.get().submit(task); } else { - f = executor.get().schedule(decoratedRun, delay, unit); + f = executor.get().schedule(task, delay, unit); } - return Disposables.fromFuture(f); + task.setFuture(f); + return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; @@ -124,10 +125,11 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit uni @NonNull @Override public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { - Runnable decoratedRun = RxJavaPlugins.onSchedule(run); + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run)); try { - Future f = executor.get().scheduleAtFixedRate(decoratedRun, initialDelay, period, unit); - return Disposables.fromFuture(f); + Future f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit); + task.setFuture(f); + return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; diff --git a/src/test/java/io/reactivex/disposables/FutureDisposableTest.java b/src/test/java/io/reactivex/disposables/FutureDisposableTest.java new file mode 100644 index 0000000000..de7ee68385 --- /dev/null +++ b/src/test/java/io/reactivex/disposables/FutureDisposableTest.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.disposables; + +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.internal.functions.Functions; + +public class FutureDisposableTest { + + @Test + public void normal() { + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null); + Disposable d = Disposables.fromFuture(ft); + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + + assertTrue(ft.isCancelled()); + } + + @Test + public void interruptible() { + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null); + Disposable d = Disposables.fromFuture(ft, true); + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + + assertTrue(ft.isCancelled()); + } + + @Test + public void normalDone() { + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null); + FutureDisposable d = new FutureDisposable(ft, false); + assertFalse(d.isDisposed()); + + assertFalse(d.isDisposed()); + + ft.run(); + + assertTrue(d.isDisposed()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java index cce58a9b50..1d611a6287 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java @@ -13,12 +13,17 @@ package io.reactivex.internal.operators.completable; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import io.reactivex.*; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.functions.Action; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.*; public class CompletableTimerTest { @@ -26,4 +31,37 @@ public class CompletableTimerTest { public void dispose() { TestHelper.checkDisposed(Completable.timer(1, TimeUnit.SECONDS, new TestScheduler())); } + + @Test + public void timerInterruptible() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final AtomicBoolean interrupted = new AtomicBoolean(); + TestObserver ts = Completable.timer(1, TimeUnit.MILLISECONDS, s) + .doOnComplete(new Action() { + @Override + public void run() throws Exception { + try { + Thread.sleep(3000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + } + }) + .test(); + + Thread.sleep(500); + + ts.cancel(); + + Thread.sleep(500); + + assertTrue(s.getClass().getSimpleName(), interrupted.get()); + } + } finally { + exec.shutdown(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java index 23f9e31cd6..9e23acf0ee 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOtherTest.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.flowable; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.junit.*; @@ -21,6 +22,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; public class FlowableDelaySubscriptionOtherTest { @@ -319,4 +321,30 @@ public Object apply(Flowable o) throws Exception { } }, false, 1, 1, 1); } + + @Test + public void afterDelayNoInterrupt() { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final TestSubscriber observer = TestSubscriber.create(); + observer.withTag(s.getClass().getSimpleName()); + + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter emitter) throws Exception { + emitter.onNext(Thread.interrupted()); + emitter.onComplete(); + } + }, BackpressureStrategy.MISSING) + .delaySubscription(100, TimeUnit.MILLISECONDS, s) + .subscribe(observer); + + observer.awaitTerminalEvent(); + observer.assertValue(false); + } + } finally { + exec.shutdown(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java index 6de199bcb2..16052e328d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java @@ -13,12 +13,13 @@ package io.reactivex.internal.operators.flowable; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.*; import org.mockito.*; @@ -27,8 +28,9 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.functions.Function; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; public class FlowableTimerTest { @@ -341,4 +343,37 @@ public void timerDelayZero() { RxJavaPlugins.reset(); } } + + @Test + public void timerInterruptible() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final AtomicBoolean interrupted = new AtomicBoolean(); + TestSubscriber ts = Flowable.timer(1, TimeUnit.MILLISECONDS, s) + .map(new Function() { + @Override + public Long apply(Long v) throws Exception { + try { + Thread.sleep(3000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + return v; + } + }) + .test(); + + Thread.sleep(500); + + ts.cancel(); + + Thread.sleep(500); + + assertTrue(s.getClass().getSimpleName(), interrupted.get()); + } + } finally { + exec.shutdown(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java index 88edf62359..d588be655b 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java @@ -13,12 +13,17 @@ package io.reactivex.internal.operators.maybe; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import io.reactivex.*; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.*; public class MaybeTimerTest { @@ -26,4 +31,38 @@ public class MaybeTimerTest { public void dispose() { TestHelper.checkDisposed(Maybe.timer(1, TimeUnit.SECONDS, new TestScheduler())); } + + @Test + public void timerInterruptible() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final AtomicBoolean interrupted = new AtomicBoolean(); + TestObserver ts = Maybe.timer(1, TimeUnit.MILLISECONDS, s) + .map(new Function() { + @Override + public Long apply(Long v) throws Exception { + try { + Thread.sleep(3000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + return v; + } + }) + .test(); + + Thread.sleep(500); + + ts.cancel(); + + Thread.sleep(500); + + assertTrue(s.getClass().getSimpleName(), interrupted.get()); + } + } finally { + exec.shutdown(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java index fe68ebccf7..566bf0ffb3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelaySubscriptionOtherTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; @@ -22,6 +23,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; public class ObservableDelaySubscriptionOtherTest { @@ -200,4 +202,32 @@ public Object apply(Observable o) throws Exception { } }, false, 1, 1, 1); } + + + @Test + public void afterDelayNoInterrupt() { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final TestObserver observer = TestObserver.create(); + observer.withTag(s.getClass().getSimpleName()); + + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter emitter) throws Exception { + emitter.onNext(Thread.interrupted()); + emitter.onComplete(); + } + }) + .delaySubscription(100, TimeUnit.MILLISECONDS, s) + .subscribe(observer); + + observer.awaitTerminalEvent(); + observer.assertValue(false); + } + } finally { + exec.shutdown(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java index 5124efc2f1..e94d5c1ec4 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java @@ -17,17 +17,19 @@ import static org.mockito.Mockito.*; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.*; import org.mockito.*; import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; public class ObservableTimerTest { @Mock @@ -303,4 +305,38 @@ public void timerDelayZero() { RxJavaPlugins.reset(); } } + + @Test + public void timerInterruptible() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final AtomicBoolean interrupted = new AtomicBoolean(); + TestObserver ts = Observable.timer(1, TimeUnit.MILLISECONDS, s) + .map(new Function() { + @Override + public Long apply(Long v) throws Exception { + try { + Thread.sleep(3000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + return v; + } + }) + .test(); + + Thread.sleep(500); + + ts.cancel(); + + Thread.sleep(500); + + assertTrue(s.getClass().getSimpleName(), interrupted.get()); + } + } finally { + exec.shutdown(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java index 35e707e0e8..93719ed764 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java @@ -13,12 +13,17 @@ package io.reactivex.internal.operators.single; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import io.reactivex.*; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.*; public class SingleTimerTest { @@ -26,4 +31,38 @@ public class SingleTimerTest { public void disposed() { TestHelper.checkDisposed(Single.timer(1, TimeUnit.SECONDS, new TestScheduler())); } + + @Test + public void timerInterruptible() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + final AtomicBoolean interrupted = new AtomicBoolean(); + TestObserver ts = Single.timer(1, TimeUnit.MILLISECONDS, s) + .map(new Function() { + @Override + public Long apply(Long v) throws Exception { + try { + Thread.sleep(3000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + return v; + } + }) + .test(); + + Thread.sleep(500); + + ts.cancel(); + + Thread.sleep(500); + + assertTrue(s.getClass().getSimpleName(), interrupted.get()); + } + } finally { + exec.shutdown(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/schedulers/AbstractDirectTaskTest.java b/src/test/java/io/reactivex/internal/schedulers/AbstractDirectTaskTest.java new file mode 100644 index 0000000000..dd77428cd7 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/AbstractDirectTaskTest.java @@ -0,0 +1,242 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.FutureTask; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.internal.functions.Functions; + +public class AbstractDirectTaskTest { + + @Test + public void cancelSetFuture() { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + final Boolean[] interrupted = { null }; + + assertFalse(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + task.setFuture(ft); + + assertTrue(interrupted[0]); + + assertTrue(task.isDisposed()); + } + + @Test + public void cancelSetFutureCurrentThread() { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + final Boolean[] interrupted = { null }; + + assertFalse(task.isDisposed()); + + task.runner = Thread.currentThread(); + + task.dispose(); + + assertTrue(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + task.setFuture(ft); + + assertFalse(interrupted[0]); + + assertTrue(task.isDisposed()); + } + + @Test + public void setFutureCancel() { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + final Boolean[] interrupted = { null }; + + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + + assertFalse(task.isDisposed()); + + task.setFuture(ft); + + assertFalse(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + assertTrue(interrupted[0]); + } + @Test + public void setFutureCancelSameThread() { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + final Boolean[] interrupted = { null }; + + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + assertFalse(task.isDisposed()); + + task.setFuture(ft); + + task.runner = Thread.currentThread(); + + assertFalse(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + assertFalse(interrupted[0]); + } + + @Test + public void finished() { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + final Boolean[] interrupted = { null }; + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + + task.set(AbstractDirectTask.FINISHED); + + task.setFuture(ft); + + assertTrue(task.isDisposed()); + + assertNull(interrupted[0]); + + task.dispose(); + + assertTrue(task.isDisposed()); + + assertNull(interrupted[0]); + } + + @Test + public void finishedCancel() { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + final Boolean[] interrupted = { null }; + FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + + task.set(AbstractDirectTask.FINISHED); + + assertTrue(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + task.setFuture(ft); + + assertTrue(task.isDisposed()); + + assertNull(interrupted[0]); + + assertTrue(task.isDisposed()); + + assertNull(interrupted[0]); + } + + @Test + public void disposeSetFutureRace() { + for (int i = 0; i < 1000; i++) { + final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + private static final long serialVersionUID = 208585707945686116L; + }; + + final Boolean[] interrupted = { null }; + final FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + interrupted[0] = mayInterruptIfRunning; + return super.cancel(mayInterruptIfRunning); + } + }; + + Runnable r1 = new Runnable() { + @Override + public void run() { + task.dispose(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + task.setFuture(ft); + } + }; + + TestHelper.race(r1, r2); + } + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTaskTest.java b/src/test/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTaskTest.java new file mode 100644 index 0000000000..cbdc2b7959 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTaskTest.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; + +public class ScheduledDirectPeriodicTaskTest { + + @Test + public void runnableThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }); + + task.run(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java index 0a93375f86..169767d4c6 100644 --- a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java @@ -217,4 +217,70 @@ public void run() { RxJavaPlugins.reset(); } } + + @Test + public void withoutParentDisposed() { + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + run.dispose(); + run.call(); + } + + @Test + public void withParentDisposed() { + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, new CompositeDisposable()); + run.dispose(); + run.call(); + } + + @Test + public void withFutureDisposed() { + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + run.setFuture(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + run.dispose(); + run.call(); + } + + @Test + public void withFutureDisposed2() { + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + run.dispose(); + run.setFuture(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + run.call(); + } + + @Test + public void withFutureDisposed3() { + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + run.dispose(); + run.set(2, Thread.currentThread()); + run.setFuture(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + run.call(); + } + + @Test + public void runFuture() { + for (int i = 0; i < 500; i++) { + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + final FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, null); + + Runnable r1 = new Runnable() { + @Override + public void run() { + run.call(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + run.setFuture(ft); + } + }; + + TestHelper.race(r1, r2); + } + } }