Skip to content

Commit

Permalink
Allow scaling executors to reject tasks after shutdown (elastic#81856)
Browse files Browse the repository at this point in the history
Today scaling thread pools never reject tasks but always add them to the
queue of task the execute, even in the case the thread pool executor is
shutting down or terminated. This behaviour does not work great when a
task is blocked waiting for another task from another scaling thread
pool to complete an I/O operation which will never be executed if the
task was enqueued just before the scaling thread pool was shutting down.


This situation is more likely to happen with searchable snapshots in
which multiple threads can be blocked waiting for parts of Lucene files
to be fetched and made available in cache. We saw tests failures in CI
where Lucene 9 uses concurrent threads (to asynchronously checks
indices) that were blocked waiting for cache files to be available and
failing because of leaking files handles (see elastic#77017, elastic#77178).

This pull request changes the `ForceQueuePolicy` used by scaling thread
pools so that it now accepts a `rejectAfterShutdown` flag which can be
set on a per thread pool basis to indicate when tasks should just be
rejected once the thread pool is shut down. Because we rely on many
scaling thread pools to be black holes and never reject tasks, this flag
is set to `false` on most of them to keep the current behavior. In some
cases where the rejection logic was already implemented correctly this
flag has been set to `true`.

This pull request also reimplements the interface
`XRejectedExecutionHandler` into an abstract class
`EsRejectedExecutionHandler` that allows to share some logic for
rejections.
  • Loading branch information
tlrx committed Jan 24, 2022
1 parent bc192f2 commit a5dbe1b
Show file tree
Hide file tree
Showing 26 changed files with 432 additions and 85 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/81856.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 81856
summary: Allow scaling executors to reject tasks after shutdown
area: Infra/Core
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
}

public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L));
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L), false);
}

public static ExecutorBuilder<?> nettyEventLoopExecutorBuilder(Settings settings) {
int eventLoopThreads = AzureClientProvider.eventLoopThreadsFromSettings(settings);
return new ScalingExecutorBuilder(NETTY_EVENT_LOOP_THREAD_POOL_NAME, 0, eventLoopThreads, TimeValue.timeValueSeconds(30L));
return new ScalingExecutorBuilder(NETTY_EVENT_LOOP_THREAD_POOL_NAME, 0, eventLoopThreads, TimeValue.timeValueSeconds(30L), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted
1,
10,
TimeUnit.SECONDS,
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext()
);
Expand Down Expand Up @@ -190,6 +191,7 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru
1,
10,
TimeUnit.SECONDS,
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private static class TrackedCluster {
private final ThreadPool threadPool = new TestThreadPool(
"TrackedCluster",
// a single thread for "client" activities, to limit the number of activities all starting at once
new ScalingExecutorBuilder(CLIENT, 1, 1, TimeValue.ZERO, CLIENT)
new ScalingExecutorBuilder(CLIENT, 1, 1, TimeValue.ZERO, true, CLIENT)
);

private final AtomicBoolean shouldStop = new AtomicBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.core.TimeValue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -36,7 +37,7 @@ public final class EWMATrackingEsThreadPoolExecutor extends EsThreadPoolExecutor
BlockingQueue<Runnable> workQueue,
Function<Runnable, WrappedRunnable> runnableWrapper,
ThreadFactory threadFactory,
XRejectedExecutionHandler handler,
RejectedExecutionHandler handler,
ThreadContext contextHolder
) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.metrics.CounterMetric;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class EsAbortPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
public class EsAbortPolicy extends EsRejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Expand All @@ -33,12 +30,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}

@Override
public long rejected() {
return rejected.count();
incrementRejections();
throw newRejectedException(r, executor, executor.isShutdown());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public static EsThreadPoolExecutor newScaling(
int max,
long keepAliveTime,
TimeUnit unit,
boolean rejectAfterShutdown,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
Expand All @@ -94,7 +95,7 @@ public static EsThreadPoolExecutor newScaling(
unit,
queue,
threadFactory,
new ForceQueuePolicy(),
new ForceQueuePolicy(rejectAfterShutdown),
contextHolder
);
queue.executor = executor;
Expand Down Expand Up @@ -315,25 +316,57 @@ public boolean offer(E e) {
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
*/
static class ForceQueuePolicy implements XRejectedExecutionHandler {
static class ForceQueuePolicy extends EsRejectedExecutionHandler {

/**
* This flag is used to indicate if {@link Runnable} should be rejected once the thread pool is shutting down, ie once
* {@link ThreadPoolExecutor#shutdown()} has been called. Scaling thread pools are expected to always handle tasks rejections, even
* after shutdown or termination, but it's not the case of all existing thread pools so this flag allows to keep the previous
* behavior.
*/
private final boolean rejectAfterShutdown;

/**
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
*/
ForceQueuePolicy(boolean rejectAfterShutdown) {
this.rejectAfterShutdown = rejectAfterShutdown;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
} else {
put(executor, task);
// we need to check again the executor state as it might have been concurrently shut down; in this case
// the executor's workers are shutting down and might have already picked up the task for execution.
if (executor.isShutdown() && executor.remove(task)) {
reject(executor, task);
}
}
} else {
put(executor, task);
}
}

private void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
try {
// force queue policy should only be used with a scaling queue
assert executor.getQueue() instanceof ExecutorScalingQueue;
executor.getQueue().put(r);
queue.put(task);
} catch (final InterruptedException e) {
// a scaling queue never blocks so a put to it can never be interrupted
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
}
}

@Override
public long rejected() {
return 0;
private void reject(ThreadPoolExecutor executor, Runnable task) {
incrementRejections();
throw newRejectedException(task, executor, true);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.metrics.CounterMetric;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public abstract class EsRejectedExecutionHandler implements RejectedExecutionHandler {

private final CounterMetric rejected = new CounterMetric();

/**
* The number of rejected executions.
*/
public long rejected() {
return rejected.count();
}

protected void incrementRejections() {
rejected.inc();
}

protected final EsRejectedExecutionException newRejectedException(Runnable r, ThreadPoolExecutor executor, boolean isExecutorShutdown) {
final StringBuilder builder = new StringBuilder("rejected execution of ").append(r).append(" on ").append(executor);
if (isExecutorShutdown) {
builder.append(" (shutdown)");
}
return new EsRejectedExecutionException(builder.toString(), isExecutorShutdown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.core.SuppressForbidden;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -50,7 +51,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
XRejectedExecutionHandler handler,
RejectedExecutionHandler handler,
ThreadContext contextHolder
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ protected void doStart() {
concurrentConnects,
60,
TimeUnit.SECONDS,
false,
threadFactory,
transportService.getThreadPool().getThreadContext()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
1,
0,
TimeUnit.MILLISECONDS,
true,
daemonThreadFactory(nodeName, DANGLING_INDICES_UPDATE_THREAD_NAME),
threadPool.getThreadContext()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind
1,
0,
TimeUnit.MILLISECONDS,
true,
daemonThreadFactory(nodeName, threadName),
threadPool.getThreadContext()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecuto
private final Setting<Integer> coreSetting;
private final Setting<Integer> maxSetting;
private final Setting<TimeValue> keepAliveSetting;
private final boolean rejectAfterShutdown;

/**
* Construct a scaling executor builder; the settings will have the
Expand All @@ -40,9 +41,16 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecuto
* @param max the maximum number of threads in the pool
* @param keepAlive the time that spare threads above {@code core}
* threads will be kept alive
* @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown
*/
public ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive) {
this(name, core, max, keepAlive, "thread_pool." + name);
public ScalingExecutorBuilder(
final String name,
final int core,
final int max,
final TimeValue keepAlive,
final boolean rejectAfterShutdown
) {
this(name, core, max, keepAlive, rejectAfterShutdown, "thread_pool." + name);
}

/**
Expand All @@ -55,12 +63,21 @@ public ScalingExecutorBuilder(final String name, final int core, final int max,
* @param keepAlive the time that spare threads above {@code core}
* threads will be kept alive
* @param prefix the prefix for the settings keys
* @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown
*/
public ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive, final String prefix) {
public ScalingExecutorBuilder(
final String name,
final int core,
final int max,
final TimeValue keepAlive,
final boolean rejectAfterShutdown,
final String prefix
) {
super(name);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
this.rejectAfterShutdown = rejectAfterShutdown;
}

@Override
Expand Down Expand Up @@ -89,6 +106,7 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
max,
keepAlive.millis(),
TimeUnit.MILLISECONDS,
rejectAfterShutdown,
threadFactory,
threadContext
);
Expand Down
Loading

0 comments on commit a5dbe1b

Please sign in to comment.