From 2ae2414282fb3c2af70388b357913a90c93514b7 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Wed, 18 Jan 2017 03:30:49 -0800 Subject: [PATCH] Add scheduler creation factories Resolves #4993 This is a pretty vanilla copy from RxJava 1's implementation. Note that I had to tune NewThread scheduler to not be a singleton to support this. We had talked about borrowing from project reactor's APIs for different overloads, let me know if you think we should add more fine-grained controls through these. --- .../schedulers/ComputationScheduler.java | 30 ++++-- .../internal/schedulers/IoScheduler.java | 30 ++++-- .../schedulers/NewThreadScheduler.java | 16 ++-- .../internal/schedulers/SingleScheduler.java | 25 +++-- .../io/reactivex/schedulers/Schedulers.java | 93 ++++++++++++++++++- 5 files changed, 160 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index 328fe5770f..3f91e665b2 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -15,20 +15,20 @@ */ package io.reactivex.internal.schedulers; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; - import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + /** * Holds a fixed pool of worker threads and assigns them * to requested Scheduler.Workers in a round-robin fashion. */ public final class ComputationScheduler extends Scheduler { /** This will indicate no pool is active. */ - static final FixedSchedulerPool NONE = new FixedSchedulerPool(0); + static final FixedSchedulerPool NONE; /** Manages a fixed number of workers. */ private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool"; static final RxThreadFactory THREAD_FACTORY; @@ -42,6 +42,7 @@ public final class ComputationScheduler extends Scheduler { static final PoolWorker SHUTDOWN_WORKER; + final ThreadFactory threadFactory; final AtomicReference pool; /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_COMPUTATION_PRIORITY = "rx2.computation-priority"; @@ -56,6 +57,9 @@ public final class ComputationScheduler extends Scheduler { Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY))); THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); + + NONE = new FixedSchedulerPool(0, THREAD_FACTORY); + NONE.shutdown(); } static int cap(int cpuCount, int paramThreads) { @@ -68,12 +72,12 @@ static final class FixedSchedulerPool { final PoolWorker[] eventLoops; long n; - FixedSchedulerPool(int maxThreads) { + FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) { // initialize event loops this.cores = maxThreads; this.eventLoops = new PoolWorker[maxThreads]; for (int i = 0; i < maxThreads; i++) { - this.eventLoops[i] = new PoolWorker(THREAD_FACTORY); + this.eventLoops[i] = new PoolWorker(threadFactory); } } @@ -98,6 +102,18 @@ public void shutdown() { * count and using least-recent worker selection policy. */ public ComputationScheduler() { + this(THREAD_FACTORY); + } + + /** + * Create a scheduler with pool size equal to the available processor + * count and using least-recent worker selection policy. + * + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + */ + public ComputationScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; this.pool = new AtomicReference(NONE); start(); } @@ -121,7 +137,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo @Override public void start() { - FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS); + FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index 710ab9a04c..8030cd7333 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -16,13 +16,13 @@ package io.reactivex.internal.schedulers; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + /** * Scheduler that creates and caches a set of thread pools and reuses them if possible. */ @@ -37,6 +37,7 @@ public final class IoScheduler extends Scheduler { private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS; static final ThreadWorker SHUTDOWN_THREAD_WORKER; + final ThreadFactory threadFactory; final AtomicReference pool; /** The name of the system property for setting the thread priority for this Scheduler. */ @@ -44,9 +45,6 @@ public final class IoScheduler extends Scheduler { static final CachedWorkerPool NONE; static { - NONE = new CachedWorkerPool(0, null); - NONE.shutdown(); - SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown")); SHUTDOWN_THREAD_WORKER.dispose(); @@ -56,6 +54,9 @@ public final class IoScheduler extends Scheduler { WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); + + NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); + NONE.shutdown(); } static final class CachedWorkerPool implements Runnable { @@ -64,11 +65,13 @@ static final class CachedWorkerPool implements Runnable { final CompositeDisposable allWorkers; private final ScheduledExecutorService evictorService; private final Future evictorTask; + private final ThreadFactory threadFactory; - CachedWorkerPool(long keepAliveTime, TimeUnit unit) { + CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; this.expiringWorkerQueue = new ConcurrentLinkedQueue(); this.allWorkers = new CompositeDisposable(); + this.threadFactory = threadFactory; ScheduledExecutorService evictor = null; Future task = null; @@ -97,7 +100,7 @@ ThreadWorker get() { } // No cached worker found, so create a new one. - ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY); + ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } @@ -143,13 +146,22 @@ void shutdown() { } public IoScheduler() { + this(WORKER_THREAD_FACTORY); + } + + /** + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + */ + public IoScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; this.pool = new AtomicReference(NONE); start(); } @Override public void start() { - CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); + CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java index 52f2426f4d..e78f897d7e 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java @@ -18,16 +18,18 @@ import io.reactivex.Scheduler; +import java.util.concurrent.ThreadFactory; + /** * Schedules work on a new thread. */ public final class NewThreadScheduler extends Scheduler { + final ThreadFactory threadFactory; + private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler"; private static final RxThreadFactory THREAD_FACTORY; - private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); - /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority"; @@ -38,16 +40,16 @@ public final class NewThreadScheduler extends Scheduler { THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); } - public static NewThreadScheduler instance() { - return INSTANCE; + public NewThreadScheduler() { + this(THREAD_FACTORY); } - private NewThreadScheduler() { - + public NewThreadScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; } @Override public Worker createWorker() { - return new NewThreadWorker(THREAD_FACTORY); + return new NewThreadWorker(threadFactory); } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index ce65031280..4c11a8926f 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -12,20 +12,22 @@ */ package io.reactivex.internal.schedulers; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; - import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + /** * A scheduler with a shared, single threaded underlying ScheduledExecutorService. * @since 2.0 */ public final class SingleScheduler extends Scheduler { + final ThreadFactory threadFactory; final AtomicReference executor = new AtomicReference(); /** The name of the system property for setting the thread priority for this Scheduler. */ @@ -47,11 +49,20 @@ public final class SingleScheduler extends Scheduler { } public SingleScheduler() { - executor.lazySet(createExecutor()); + this(SINGLE_THREAD_FACTORY); + } + + /** + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + */ + public SingleScheduler(ThreadFactory threadFactory) { + this.threadFactory = ObjectHelper.requireNonNull(threadFactory, "threadFactory was null"); + executor.lazySet(createExecutor(threadFactory)); } - static ScheduledExecutorService createExecutor() { - return SchedulerPoolFactory.create(SINGLE_THREAD_FACTORY); + static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) { + return SchedulerPoolFactory.create(threadFactory); } @Override @@ -66,7 +77,7 @@ public void start() { return; } if (next == null) { - next = createExecutor(); + next = createExecutor(threadFactory); } if (executor.compareAndSet(current, next)) { return; diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index fb74da4f3b..b0972d7c55 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -13,13 +13,14 @@ package io.reactivex.schedulers; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; - import io.reactivex.Scheduler; +import io.reactivex.annotations.Experimental; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.schedulers.*; import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.*; + /** * Static factory methods for returning standard Scheduler instances. *

@@ -58,7 +59,7 @@ static final class IoHolder { } static final class NewThreadHolder { - static final Scheduler DEFAULT = NewThreadScheduler.instance(); + static final Scheduler DEFAULT = new NewThreadScheduler(); } static { @@ -179,6 +180,90 @@ public static Scheduler from(Executor executor) { return new ExecutorScheduler(executor); } + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newComputation() { + return new ComputationScheduler(); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newComputation(ThreadFactory threadFactory) { + return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null")); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newIo() { + return new IoScheduler(); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newIo(ThreadFactory threadFactory) { + return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null")); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newNewThread() { + return new NewThreadScheduler(); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newNewThread(ThreadFactory threadFactory) { + return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null")); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newSingle() { + return new SingleScheduler(); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + */ + @Experimental + public static Scheduler newSingle(ThreadFactory threadFactory) { + return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null")); + } + /** * Shuts down those standard Schedulers which support the SchedulerLifecycle interface. *

The operation is idempotent and thread-safe.