Skip to content

Commit

Permalink
Expose scheduler factories which accept thread factories. (#3879)
Browse files Browse the repository at this point in the history
This allows hooks to create schedulers whose threads have different priorities.
  • Loading branch information
JakeWharton authored and akarnokd committed Apr 29, 2016
1 parent 8c82440 commit 5da378f
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 40 deletions.
33 changes: 18 additions & 15 deletions src/main/java/rx/internal/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,39 @@
import rx.subscriptions.*;

public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
static final RxThreadFactory WORKER_THREAD_FACTORY =
new RxThreadFactory(WORKER_THREAD_NAME_PREFIX);

private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
static final RxThreadFactory EVICTOR_THREAD_FACTORY =
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);

private static final long KEEP_ALIVE_TIME = 60;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

static final ThreadWorker SHUTDOWN_THREADWORKER;
static {
SHUTDOWN_THREADWORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown-"));
SHUTDOWN_THREADWORKER = new ThreadWorker(RxThreadFactory.NONE);
SHUTDOWN_THREADWORKER.unsubscribe();
}

private static final class CachedWorkerPool {
private final ThreadFactory threadFactory;
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final CompositeSubscription allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;

CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
this.threadFactory = threadFactory;
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeSubscription();

ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(thread.getName() + " (Evictor)");
return thread;
}
});
NewThreadWorker.tryEnableCancelPolicy(evictor);
task = evictor.scheduleWithFixedDelay(
new Runnable() {
Expand All @@ -83,7 +83,7 @@ ThreadWorker get() {
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
Expand Down Expand Up @@ -131,22 +131,25 @@ void shutdown() {
}
}

final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;

static final CachedWorkerPool NONE;
static {
NONE = new CachedWorkerPool(0, null);
NONE = new CachedWorkerPool(null, 0, null);
NONE.shutdown();
}

public CachedThreadScheduler() {
public CachedThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
CachedWorkerPool update =
new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/rx/internal/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import rx.internal.util.*;
import rx.subscriptions.*;

public class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle {
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
public final class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle {
/**
* Key to setting the maximum number of computation scheduler threads.
* Zero or less is interpreted as use available. Capped by available.
Expand All @@ -48,7 +45,7 @@ public class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle

static final PoolWorker SHUTDOWN_WORKER;
static {
SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown-"));
SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE);
SHUTDOWN_WORKER.unsubscribe();
}

Expand All @@ -58,12 +55,12 @@ static final class FixedSchedulerPool {
final PoolWorker[] eventLoops;
long n;

FixedSchedulerPool(int maxThreads) {
FixedSchedulerPool(ThreadFactory threadFactory, int maxThreads) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}

Expand All @@ -83,15 +80,17 @@ public void shutdown() {
}
}
/** This will indicate no pool is active. */
static final FixedSchedulerPool NONE = new FixedSchedulerPool(0);
static final FixedSchedulerPool NONE = new FixedSchedulerPool(null, 0);

final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;

/**
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*/
public EventLoopsScheduler() {
public EventLoopsScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
Expand All @@ -103,7 +102,7 @@ public Worker createWorker() {

@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS);
FixedSchedulerPool update = new FixedSchedulerPool(threadFactory, MAX_THREADS);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/rx/internal/schedulers/NewThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
*/
package rx.internal.schedulers;

import java.util.concurrent.ThreadFactory;
import rx.Scheduler;
import rx.internal.util.RxThreadFactory;

/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
private final ThreadFactory threadFactory;

private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

public NewThreadScheduler() {
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
return new NewThreadWorker(threadFactory);
}
}
6 changes: 6 additions & 0 deletions src/main/java/rx/internal/util/RxThreadFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import java.util.concurrent.atomic.AtomicLong;

public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
public static final ThreadFactory NONE = new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
throw new AssertionError("No threads allowed.");
}
};

final String prefix;

public RxThreadFactory(String prefix) {
Expand Down
38 changes: 35 additions & 3 deletions src/main/java/rx/plugins/RxJavaSchedulersHook.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package rx.plugins;

import java.util.concurrent.ThreadFactory;
import rx.Scheduler;
import rx.annotations.Experimental;
import rx.functions.Action0;
import rx.internal.schedulers.CachedThreadScheduler;
import rx.internal.schedulers.EventLoopsScheduler;
import rx.internal.schedulers.NewThreadScheduler;
import rx.internal.util.RxThreadFactory;
import rx.schedulers.Schedulers;

/**
Expand All @@ -45,23 +47,53 @@ public class RxJavaSchedulersHook {
*/
@Experimental
public static Scheduler createComputationScheduler() {
return new EventLoopsScheduler();
return createComputationScheduler(new RxThreadFactory("RxComputationScheduler-"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
*/
@Experimental
public static Scheduler createComputationScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory == null");
return new EventLoopsScheduler(threadFactory);
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}.
*/
@Experimental
public static Scheduler createIoScheduler() {
return new CachedThreadScheduler();
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
*/
@Experimental
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory == null");
return new CachedThreadScheduler(threadFactory);
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}.
*/
@Experimental
public static Scheduler createNewThreadScheduler() {
return new NewThreadScheduler();
return createNewThreadScheduler(new RxThreadFactory("RxNewThreadScheduler-"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
*/
@Experimental
public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory == null");
return new NewThreadScheduler(threadFactory);
}

protected RxJavaSchedulersHook() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void call(Integer t1) {
* Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
*/
@Test
public void testObserveOnWithThreadPoolScheduler() {
public void testObserveOnWithComputationScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Expand All @@ -255,7 +255,7 @@ public Integer call(Integer t1) {
@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
assertTrue(Thread.currentThread().getName().startsWith("RxComputationScheduler"));
}

});
Expand Down Expand Up @@ -295,7 +295,7 @@ public Integer call(Integer t1) {
@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
assertTrue(Thread.currentThread().getName().startsWith("RxComputationScheduler"));
}

});
Expand Down
Loading

0 comments on commit 5da378f

Please sign in to comment.