From f31aed36fd9d47edf0177164c54c64d4dc28c818 Mon Sep 17 00:00:00 2001 From: Sergio Garcia Date: Tue, 26 Jan 2021 06:47:29 +0000 Subject: [PATCH] Support for scheduled release of threads in Io Scheduler (#7162) --- .../internal/schedulers/IoScheduler.java | 22 +++++-- .../io/reactivex/schedulers/Schedulers.java | 19 ++++++ .../schedulers/IoScheduledReleaseTest.java | 58 +++++++++++++++++++ 3 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 src/test/java/io/reactivex/internal/schedulers/IoScheduledReleaseTest.java diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index d806321bf3..02cb44a1d3 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler { /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_IO_PRIORITY = "rx2.io-priority"; + /** The name of the system property for setting the release behaviour for this Scheduler. */ + private static final String KEY_SCHEDULED_RELEASE = "rx2.io-scheduled-release"; + static boolean USE_SCHEDULED_RELEASE; + static final CachedWorkerPool NONE; static { @@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler { EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); + USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE); + NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); NONE.shutdown(); } @@ -200,7 +206,7 @@ public int size() { return pool.get().allWorkers.size(); } - static final class EventLoopWorker extends Scheduler.Worker { + static final class EventLoopWorker extends Scheduler.Worker implements Runnable { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; @@ -217,12 +223,20 @@ static final class EventLoopWorker extends Scheduler.Worker { public void dispose() { if (once.compareAndSet(false, true)) { tasks.dispose(); - - // releasing the pool should be the last action - pool.release(threadWorker); + if (USE_SCHEDULED_RELEASE) { + threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null); + } else { + // releasing the pool should be the last action + pool.release(threadWorker); + } } } + @Override + public void run() { + pool.release(threadWorker); + } + @Override public boolean isDisposed() { return once.get(); diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index 9e070690b8..dfd6bfe756 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -31,6 +31,8 @@ *