diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
index d806321bf3..02cb44a1d3 100644
--- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
+++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
@@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler {
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";
+ /** The name of the system property for setting the release behaviour for this Scheduler. */
+ private static final String KEY_SCHEDULED_RELEASE = "rx2.io-scheduled-release";
+ static boolean USE_SCHEDULED_RELEASE;
+
static final CachedWorkerPool NONE;
static {
@@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler {
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);
+ USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE);
+
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
@@ -200,7 +206,7 @@ public int size() {
return pool.get().allWorkers.size();
}
- static final class EventLoopWorker extends Scheduler.Worker {
+ static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
@@ -217,12 +223,20 @@ static final class EventLoopWorker extends Scheduler.Worker {
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
-
- // releasing the pool should be the last action
- pool.release(threadWorker);
+ if (USE_SCHEDULED_RELEASE) {
+ threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
+ } else {
+ // releasing the pool should be the last action
+ pool.release(threadWorker);
+ }
}
}
+ @Override
+ public void run() {
+ pool.release(threadWorker);
+ }
+
@Override
public boolean isDisposed() {
return once.get();
diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java
index 9e070690b8..dfd6bfe756 100644
--- a/src/main/java/io/reactivex/schedulers/Schedulers.java
+++ b/src/main/java/io/reactivex/schedulers/Schedulers.java
@@ -31,6 +31,8 @@
*
* - {@code rx2.io-keep-alive-time} (long): sets the keep-alive time of the {@link #io()} Scheduler workers, default is {@link IoScheduler#KEEP_ALIVE_TIME_DEFAULT}
* - {@code rx2.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}
+ * - {@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
+ * {@link #io()} Scheduler to scheduled, default is {@code false} for eager mode.
* - {@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs
* - {@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}
* - {@code rx2.newthread-priority} (int): sets the thread priority of the {@link #newThread()} Scheduler, default is {@link Thread#NORM_PRIORITY}
@@ -113,6 +115,8 @@ private Schedulers() {
*
* - {@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs
* - {@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}
+ * - {@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
+ * {@code #io()} Scheduler to scheduled, default is {@code false} for eager mode.
*
*
* The default value of this scheduler can be overridden at initialization time via the
@@ -129,6 +133,21 @@ private Schedulers() {
*
Operators on the base reactive classes that use this scheduler are marked with the
* @{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#COMPUTATION COMPUTATION})
* annotation.
+ *
+ * When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
+ *
+ * - In eager mode (default), the underlying worker is returned immediately to the cached worker pool
+ * and can be reused much quicker by operators. The drawback is that if the currently running task doesn't
+ * respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the
+ * underlying worker.
+ *
+ * - In scheduled mode (enabled via the system parameter {@code rx2.io-scheduled-release}
+ * set to {@code true}), the underlying worker is returned to the cached worker pool only after the currently running task
+ * has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or
+ * deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying
+ * workers being created.
+ *
+ *
* @return a {@link Scheduler} meant for computation-bound work
*/
@NonNull
diff --git a/src/test/java/io/reactivex/internal/schedulers/IoScheduledReleaseTest.java b/src/test/java/io/reactivex/internal/schedulers/IoScheduledReleaseTest.java
new file mode 100644
index 0000000000..7eaaa377c0
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/schedulers/IoScheduledReleaseTest.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.internal.schedulers;
+
+import io.reactivex.Completable;
+import io.reactivex.Flowable;
+import io.reactivex.annotations.NonNull;
+import io.reactivex.functions.Function;
+import io.reactivex.schedulers.Schedulers;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class IoScheduledReleaseTest {
+
+ /* This test will be stuck in a deadlock if IoScheduler.USE_SCHEDULED_RELEASE is not set */
+ @Test
+ public void scheduledRelease() {
+ boolean savedScheduledRelease = IoScheduler.USE_SCHEDULED_RELEASE;
+ IoScheduler.USE_SCHEDULED_RELEASE = true;
+ try {
+ Flowable.just("item")
+ .observeOn(Schedulers.io())
+ .firstOrError()
+ .map(new Function() {
+ @Override
+ public String apply(@NonNull final String item) throws Exception {
+ for (int i = 0; i < 50; i++) {
+ Completable.complete()
+ .observeOn(Schedulers.io())
+ .blockingAwait();
+ }
+ return "Done";
+ }
+ })
+ .ignoreElement()
+ .test()
+ .awaitDone(5, TimeUnit.SECONDS)
+ .assertComplete();
+ } finally {
+ IoScheduler.USE_SCHEDULED_RELEASE = savedScheduledRelease;
+ }
+ }
+}