Skip to content

Commit

Permalink
Support for scheduled release of threads in Io Scheduler (#7162)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgc-code authored Jan 26, 2021
1 parent 947b05f commit f31aed3
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 4 deletions.
22 changes: 18 additions & 4 deletions src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler {
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";

/** The name of the system property for setting the release behaviour for this Scheduler. */
private static final String KEY_SCHEDULED_RELEASE = "rx2.io-scheduled-release";
static boolean USE_SCHEDULED_RELEASE;

static final CachedWorkerPool NONE;

static {
Expand All @@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler {

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
Expand Down Expand Up @@ -200,7 +206,7 @@ public int size() {
return pool.get().allWorkers.size();
}

static final class EventLoopWorker extends Scheduler.Worker {
static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
Expand All @@ -217,12 +223,20 @@ static final class EventLoopWorker extends Scheduler.Worker {
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();

// releasing the pool should be the last action
pool.release(threadWorker);
if (USE_SCHEDULED_RELEASE) {
threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
} else {
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
}

@Override
public void run() {
pool.release(threadWorker);
}

@Override
public boolean isDisposed() {
return once.get();
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* <ul>
* <li>{@code rx2.io-keep-alive-time} (long): sets the keep-alive time of the {@link #io()} Scheduler workers, default is {@link IoScheduler#KEEP_ALIVE_TIME_DEFAULT}</li>
* <li>{@code rx2.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
* {@link #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.newthread-priority} (int): sets the thread priority of the {@link #newThread()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
Expand Down Expand Up @@ -113,6 +115,8 @@ private Schedulers() {
* <ul>
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
* {@code #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
* </ul>
* <p>
* The default value of this scheduler can be overridden at initialization time via the
Expand All @@ -129,6 +133,21 @@ private Schedulers() {
* <p>Operators on the base reactive classes that use this scheduler are marked with the
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#COMPUTATION COMPUTATION})
* annotation.
* <p>
* When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
* <ul>
* <li>In <em>eager</em> mode (default), the underlying worker is returned immediately to the cached worker pool
* and can be reused much quicker by operators. The drawback is that if the currently running task doesn't
* respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the
* underlying worker.
* </li>
* <li>In <em>scheduled</em> mode (enabled via the system parameter {@code rx2.io-scheduled-release}
* set to {@code true}), the underlying worker is returned to the cached worker pool only after the currently running task
* has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or
* deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying
* workers being created.
* </li>
* </ul>
* @return a {@link Scheduler} meant for computation-bound work
*/
@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.internal.schedulers;

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class IoScheduledReleaseTest {

/* This test will be stuck in a deadlock if IoScheduler.USE_SCHEDULED_RELEASE is not set */
@Test
public void scheduledRelease() {
boolean savedScheduledRelease = IoScheduler.USE_SCHEDULED_RELEASE;
IoScheduler.USE_SCHEDULED_RELEASE = true;
try {
Flowable.just("item")
.observeOn(Schedulers.io())
.firstOrError()
.map(new Function<String, String>() {
@Override
public String apply(@NonNull final String item) throws Exception {
for (int i = 0; i < 50; i++) {
Completable.complete()
.observeOn(Schedulers.io())
.blockingAwait();
}
return "Done";
}
})
.ignoreElement()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertComplete();
} finally {
IoScheduler.USE_SCHEDULED_RELEASE = savedScheduledRelease;
}
}
}

0 comments on commit f31aed3

Please sign in to comment.