Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Jan 24, 2022
1 parent 6a15408 commit 3a4e5f4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,24 +335,38 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
try {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
queue.put(task);
// we need to check again the executor state as it might have been concurrently shut down; in this case
// the executor's workers are shutting down and might have already picked up the task for execution.
if (rejectAfterShutdown && executor.isShutdown()) {
if (executor.remove(task)) {
incrementRejections();
throw newRejectedException(task, executor, true);
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
} else {
put(executor, task);
// we need to check again the executor state as it might have been concurrently shut down; in this case
// the executor's workers are shutting down and might have already picked up the task for execution.
if (executor.isShutdown() && executor.remove(task)) {
reject(executor, task);
}
}
} else {
put(executor, task);
}
}

private void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
try {
queue.put(task);
} catch (final InterruptedException e) {
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
}
}

private void reject(ThreadPoolExecutor executor, Runnable task) {
incrementRejections();
throw newRejectedException(task, executor, true);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,7 @@ public void testScalingThreadPoolRejectAfterShutdown() throws Exception {
if (rejectAfterShutdown) {
final EsRejectedExecutionException exception = expectThrows(
EsRejectedExecutionException.class,
() -> scalingExecutor.execute(() -> {
throw new AssertionError("should be rejected");
})
() -> scalingExecutor.execute(() -> { throw new AssertionError("should be rejected"); })
);
assertThat(exception.getLocalizedMessage(), allOf(containsString("rejected execution of "), containsString("(shutdown)")));
assertThat(exception.isExecutorShutdown(), equalTo(true));
Expand Down Expand Up @@ -361,8 +359,9 @@ public void testScalingThreadPoolRejectDuringShutdown() throws Exception {

assertBusy(() -> assertTrue(scalingExecutor.isTerminated()));
assertThat(scalingExecutor.getCompletedTaskCount(), greaterThanOrEqualTo((long) max));
assertThat(scalingExecutor.getCompletedTaskCount(), lessThanOrEqualTo((long) max + barrier.getParties() - 1L));
assertThat(scalingExecutor.getCompletedTaskCount() + rejected.get(), equalTo((long) max + barrier.getParties() - 1L));
final long maxCompletedTasks = (long) max + barrier.getParties() - 1L;
assertThat(scalingExecutor.getCompletedTaskCount(), lessThanOrEqualTo(maxCompletedTasks));
assertThat(scalingExecutor.getCompletedTaskCount() + rejected.get(), equalTo(maxCompletedTasks));
assertThat(scalingExecutor.getQueue().size(), equalTo(0));
assertThat(scalingExecutor.getActiveCount(), equalTo(0));

Expand Down

0 comments on commit 3a4e5f4

Please sign in to comment.