diff --git a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java index 82260207ae..87f7ec5f88 100644 --- a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java +++ b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java @@ -31,24 +31,29 @@ * the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach * along with {@link TrampolineScheduler} or {@link ImmediateScheduler}. */ -public final class GenericScheduledExecutorService implements SchedulerLifecycle{ +public final class GenericScheduledExecutorService implements SchedulerLifecycle { private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-"; private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); - private static final ScheduledExecutorService NONE; + private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0]; + + private static final ScheduledExecutorService SHUTDOWN; static { - NONE = Executors.newScheduledThreadPool(0); - NONE.shutdownNow(); + SHUTDOWN = Executors.newScheduledThreadPool(0); + SHUTDOWN.shutdown(); } /* Schedulers needs acces to this in order to work with the lifecycle. */ public final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService(); - private final AtomicReference executor; + private final AtomicReference executor; + /** We don't use atomics with this because thread-assignment is random anyway. */ + private static int roundRobin; + private GenericScheduledExecutorService() { - executor = new AtomicReference(NONE); + executor = new AtomicReference(NONE); start(); } @@ -63,39 +68,60 @@ public void start() { count = 8; } - ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY); - if (executor.compareAndSet(NONE, exec)) { - if (!NewThreadWorker.tryEnableCancelPolicy(exec)) { - if (exec instanceof ScheduledThreadPoolExecutor) { - NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec); + // A multi-threaded executor can reorder tasks, having a set of them + // and handing one of those out on getInstance() ensures a proper order + + ScheduledExecutorService[] execs = new ScheduledExecutorService[count]; + for (int i = 0; i < count; i++) { + execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY); + } + if (executor.compareAndSet(NONE, execs)) { + for (ScheduledExecutorService exec : execs) { + if (!NewThreadWorker.tryEnableCancelPolicy(exec)) { + if (exec instanceof ScheduledThreadPoolExecutor) { + NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec); + } } } } else { - exec.shutdownNow(); + for (ScheduledExecutorService exec : execs) { + exec.shutdownNow(); + } } } @Override public void shutdown() { for (;;) { - ScheduledExecutorService exec = executor.get(); - if (exec == NONE) { + ScheduledExecutorService[] execs = executor.get(); + if (execs == NONE) { return; } - if (executor.compareAndSet(exec, NONE)) { - NewThreadWorker.deregisterExecutor(exec); - exec.shutdownNow(); + if (executor.compareAndSet(execs, NONE)) { + for (ScheduledExecutorService exec : execs) { + NewThreadWorker.deregisterExecutor(exec); + exec.shutdownNow(); + } return; } } } /** - * See class Javadoc for information on what this is for and how to use. + * Returns one of the single-threaded ScheduledExecutorService helper executors. * * @return {@link ScheduledExecutorService} for generic use. */ public static ScheduledExecutorService getInstance() { - return INSTANCE.executor.get(); + ScheduledExecutorService[] execs = INSTANCE.executor.get(); + if (execs == NONE) { + return SHUTDOWN; + } + int r = roundRobin + 1; + if (r >= execs.length) { + r = 0; + } + roundRobin = r; + return execs[r]; } } \ No newline at end of file diff --git a/src/main/java/rx/schedulers/ExecutorScheduler.java b/src/main/java/rx/schedulers/ExecutorScheduler.java index d447400184..8e5c9bf22e 100644 --- a/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -54,11 +54,14 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R final ConcurrentLinkedQueue queue; final AtomicInteger wip; + final ScheduledExecutorService service; + public ExecutorSchedulerWorker(Executor executor) { this.executor = executor; this.queue = new ConcurrentLinkedQueue(); this.wip = new AtomicInteger(); this.tasks = new CompositeSubscription(); + this.service = GenericScheduledExecutorService.getInstance(); } @Override @@ -108,12 +111,6 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } - ScheduledExecutorService service; - if (executor instanceof ScheduledExecutorService) { - service = (ScheduledExecutorService)executor; - } else { - service = GenericScheduledExecutorService.getInstance(); - } final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription(); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); diff --git a/src/test/java/rx/internal/schedulers/GenericScheduledExecutorServiceTest.java b/src/test/java/rx/internal/schedulers/GenericScheduledExecutorServiceTest.java new file mode 100644 index 0000000000..0b90bce072 --- /dev/null +++ b/src/test/java/rx/internal/schedulers/GenericScheduledExecutorServiceTest.java @@ -0,0 +1,43 @@ +package rx.internal.schedulers; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.*; + +public class GenericScheduledExecutorServiceTest { + @Test + public void verifyInstanceIsSingleThreaded() throws Exception { + ScheduledExecutorService exec = GenericScheduledExecutorService.getInstance(); + + final AtomicInteger state = new AtomicInteger(); + + final AtomicInteger found1 = new AtomicInteger(); + final AtomicInteger found2 = new AtomicInteger(); + + Future f1 = exec.schedule(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(250); + } catch (InterruptedException e) { + e.printStackTrace(); + } + found1.set(state.getAndSet(1)); + } + }, 250, TimeUnit.MILLISECONDS); + Future f2 = exec.schedule(new Runnable() { + @Override + public void run() { + found2.set(state.getAndSet(2)); + } + }, 250, TimeUnit.MILLISECONDS); + + f1.get(); + f2.get(); + + Assert.assertEquals(2, state.get()); + Assert.assertEquals(0, found1.get()); + Assert.assertEquals(1, found2.get()); + } +}