Skip to content

Commit

Permalink
1.x: fix ExecutorScheduler and GenericScheduledExecutorService reorder
Browse files Browse the repository at this point in the history
bug
  • Loading branch information
akarnokd committed Mar 16, 2016
1 parent 662ce3b commit c36456a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,29 @@
* the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach
* along with {@link TrampolineScheduler} or {@link ImmediateScheduler}.
*/
public final class GenericScheduledExecutorService implements SchedulerLifecycle{
public final class GenericScheduledExecutorService implements SchedulerLifecycle {

private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

private static final ScheduledExecutorService NONE;
private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0];

private static final ScheduledExecutorService SHUTDOWN;
static {
NONE = Executors.newScheduledThreadPool(0);
NONE.shutdownNow();
SHUTDOWN = Executors.newScheduledThreadPool(0);
SHUTDOWN.shutdown();
}

/* Schedulers needs acces to this in order to work with the lifecycle. */
public final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();

private final AtomicReference<ScheduledExecutorService> executor;
private final AtomicReference<ScheduledExecutorService[]> executor;

/** We don't use atomics with this because thread-assignment is random anyway. */
private static int roundRobin;

private GenericScheduledExecutorService() {
executor = new AtomicReference<ScheduledExecutorService>(NONE);
executor = new AtomicReference<ScheduledExecutorService[]>(NONE);
start();
}

Expand All @@ -63,39 +68,60 @@ public void start() {
count = 8;
}

ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
if (executor.compareAndSet(NONE, exec)) {
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
if (exec instanceof ScheduledThreadPoolExecutor) {
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
// A multi-threaded executor can reorder tasks, having a set of them
// and handing one of those out on getInstance() ensures a proper order

ScheduledExecutorService[] execs = new ScheduledExecutorService[count];
for (int i = 0; i < count; i++) {
execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
}
if (executor.compareAndSet(NONE, execs)) {
for (ScheduledExecutorService exec : execs) {
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
if (exec instanceof ScheduledThreadPoolExecutor) {
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
}
}
}
} else {
exec.shutdownNow();
for (ScheduledExecutorService exec : execs) {
exec.shutdownNow();
}
}
}

@Override
public void shutdown() {
for (;;) {
ScheduledExecutorService exec = executor.get();
if (exec == NONE) {
ScheduledExecutorService[] execs = executor.get();
if (execs == NONE) {
return;
}
if (executor.compareAndSet(exec, NONE)) {
NewThreadWorker.deregisterExecutor(exec);
exec.shutdownNow();
if (executor.compareAndSet(execs, NONE)) {
for (ScheduledExecutorService exec : execs) {
NewThreadWorker.deregisterExecutor(exec);
exec.shutdownNow();
}
return;
}
}
}

/**
* See class Javadoc for information on what this is for and how to use.
* Returns one of the single-threaded ScheduledExecutorService helper executors.
*
* @return {@link ScheduledExecutorService} for generic use.
*/
public static ScheduledExecutorService getInstance() {
return INSTANCE.executor.get();
ScheduledExecutorService[] execs = INSTANCE.executor.get();
if (execs == NONE) {
return SHUTDOWN;
}
int r = roundRobin + 1;
if (r >= execs.length) {
r = 0;
}
roundRobin = r;
return execs[r];
}
}
9 changes: 3 additions & 6 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
final ConcurrentLinkedQueue<ScheduledAction> queue;
final AtomicInteger wip;

final ScheduledExecutorService service;

public ExecutorSchedulerWorker(Executor executor) {
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
this.wip = new AtomicInteger();
this.tasks = new CompositeSubscription();
this.service = GenericScheduledExecutorService.getInstance();
}

@Override
Expand Down Expand Up @@ -108,12 +111,6 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledExecutorService service;
if (executor instanceof ScheduledExecutorService) {
service = (ScheduledExecutorService)executor;
} else {
service = GenericScheduledExecutorService.getInstance();
}

final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package rx.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.*;

public class GenericScheduledExecutorServiceTest {
@Test
public void verifyInstanceIsSingleThreaded() throws Exception {
ScheduledExecutorService exec = GenericScheduledExecutorService.getInstance();

final AtomicInteger state = new AtomicInteger();

final AtomicInteger found1 = new AtomicInteger();
final AtomicInteger found2 = new AtomicInteger();

Future<?> f1 = exec.schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
found1.set(state.getAndSet(1));
}
}, 250, TimeUnit.MILLISECONDS);
Future<?> f2 = exec.schedule(new Runnable() {
@Override
public void run() {
found2.set(state.getAndSet(2));
}
}, 250, TimeUnit.MILLISECONDS);

f1.get();
f2.get();

Assert.assertEquals(2, state.get());
Assert.assertEquals(0, found1.get());
Assert.assertEquals(1, found2.get());
}
}

0 comments on commit c36456a

Please sign in to comment.