Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: fix ExecutorScheduler and GenericScheduledExecutorService reorder bug #3760

Merged
merged 1 commit into from
Mar 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,29 @@
* the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach
* along with {@link TrampolineScheduler} or {@link ImmediateScheduler}.
*/
public final class GenericScheduledExecutorService implements SchedulerLifecycle{
public final class GenericScheduledExecutorService implements SchedulerLifecycle {

private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

private static final ScheduledExecutorService NONE;
private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0];

private static final ScheduledExecutorService SHUTDOWN;
static {
NONE = Executors.newScheduledThreadPool(0);
NONE.shutdownNow();
SHUTDOWN = Executors.newScheduledThreadPool(0);
SHUTDOWN.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ew

I definitely don't like that library will create executor when this class will be loaded. Even without threads used it looks strange.

Maybe return new each time getInstance()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executors don't start their thread until the first task is submitted.

}

/* Schedulers needs acces to this in order to work with the lifecycle. */
public final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();

private final AtomicReference<ScheduledExecutorService> executor;
private final AtomicReference<ScheduledExecutorService[]> executor;

/** We don't use atomics with this because thread-assignment is random anyway. */
private static int roundRobin;

private GenericScheduledExecutorService() {
executor = new AtomicReference<ScheduledExecutorService>(NONE);
executor = new AtomicReference<ScheduledExecutorService[]>(NONE);
start();
}

Expand All @@ -63,39 +68,60 @@ public void start() {
count = 8;
}

ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
if (executor.compareAndSet(NONE, exec)) {
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
if (exec instanceof ScheduledThreadPoolExecutor) {
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
// A multi-threaded executor can reorder tasks, having a set of them
// and handing one of those out on getInstance() ensures a proper order

ScheduledExecutorService[] execs = new ScheduledExecutorService[count];
for (int i = 0; i < count; i++) {
execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
}
if (executor.compareAndSet(NONE, execs)) {
for (ScheduledExecutorService exec : execs) {
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
if (exec instanceof ScheduledThreadPoolExecutor) {
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
}
}
}
} else {
exec.shutdownNow();
for (ScheduledExecutorService exec : execs) {
exec.shutdownNow();
}
}
}

@Override
public void shutdown() {
for (;;) {
ScheduledExecutorService exec = executor.get();
if (exec == NONE) {
ScheduledExecutorService[] execs = executor.get();
if (execs == NONE) {
return;
}
if (executor.compareAndSet(exec, NONE)) {
NewThreadWorker.deregisterExecutor(exec);
exec.shutdownNow();
if (executor.compareAndSet(execs, NONE)) {
for (ScheduledExecutorService exec : execs) {
NewThreadWorker.deregisterExecutor(exec);
exec.shutdownNow();
}
return;
}
}
}

/**
* See class Javadoc for information on what this is for and how to use.
* Returns one of the single-threaded ScheduledExecutorService helper executors.
*
* @return {@link ScheduledExecutorService} for generic use.
*/
public static ScheduledExecutorService getInstance() {
return INSTANCE.executor.get();
ScheduledExecutorService[] execs = INSTANCE.executor.get();
if (execs == NONE) {
return SHUTDOWN;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How frequent is this case that we need static final SHUTDOWN?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unknown. Since Schedulers now have lifecycle, this is the correct behavior when some latent sequence tries to schedule something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, ok. Another solution is to make SHUTDOWN lazy via hiding it under separate class, but it will increase methods count, so, I guess, k, let's live with it

}
int r = roundRobin + 1;
if (r >= execs.length) {
r = 0;
}
roundRobin = r;
return execs[r];
}
}
9 changes: 3 additions & 6 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
final ConcurrentLinkedQueue<ScheduledAction> queue;
final AtomicInteger wip;

final ScheduledExecutorService service;

public ExecutorSchedulerWorker(Executor executor) {
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
this.wip = new AtomicInteger();
this.tasks = new CompositeSubscription();
this.service = GenericScheduledExecutorService.getInstance();
}

@Override
Expand Down Expand Up @@ -108,12 +111,6 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledExecutorService service;
if (executor instanceof ScheduledExecutorService) {
service = (ScheduledExecutorService)executor;
} else {
service = GenericScheduledExecutorService.getInstance();
}

final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package rx.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.*;

public class GenericScheduledExecutorServiceTest {
@Test
public void verifyInstanceIsSingleThreaded() throws Exception {
ScheduledExecutorService exec = GenericScheduledExecutorService.getInstance();

final AtomicInteger state = new AtomicInteger();

final AtomicInteger found1 = new AtomicInteger();
final AtomicInteger found2 = new AtomicInteger();

Future<?> f1 = exec.schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
found1.set(state.getAndSet(1));
}
}, 250, TimeUnit.MILLISECONDS);
Future<?> f2 = exec.schedule(new Runnable() {
@Override
public void run() {
found2.set(state.getAndSet(2));
}
}, 250, TimeUnit.MILLISECONDS);

f1.get();
f2.get();

Assert.assertEquals(2, state.get());
Assert.assertEquals(0, found1.get());
Assert.assertEquals(1, found2.get());
}
}