diff --git a/docs/changelog/81856.yaml b/docs/changelog/81856.yaml new file mode 100644 index 0000000000000..00b898c57cd8d --- /dev/null +++ b/docs/changelog/81856.yaml @@ -0,0 +1,5 @@ +pr: 81856 +summary: Allow scaling executors to reject tasks after shutdown +area: Infra/Core +type: enhancement +issues: [] diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index b44c1e537a1a8..c3fa8d866426f 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -125,12 +125,12 @@ public List> getExecutorBuilders(Settings settingsToUse) { } public static ExecutorBuilder executorBuilder() { - return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L)); + return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L), false); } public static ExecutorBuilder nettyEventLoopExecutorBuilder(Settings settings) { int eventLoopThreads = AzureClientProvider.eventLoopThreadsFromSettings(settings); - return new ScalingExecutorBuilder(NETTY_EVENT_LOOP_THREAD_POOL_NAME, 0, eventLoopThreads, TimeValue.timeValueSeconds(30L)); + return new ScalingExecutorBuilder(NETTY_EVENT_LOOP_THREAD_POOL_NAME, 0, eventLoopThreads, TimeValue.timeValueSeconds(30L), false); } @Override diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index 25046108a98b2..150a353440bdf 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -80,6 +80,7 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted 1, 10, TimeUnit.SECONDS, + randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext() ); @@ -208,6 +209,7 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru 1, 10, TimeUnit.SECONDS, + randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext() ); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java index ab2f0c1cb3887..55e3caa538797 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java @@ -213,7 +213,7 @@ private static class TrackedCluster { private final ThreadPool threadPool = new TestThreadPool( "TrackedCluster", // a single thread for "client" activities, to limit the number of activities all starting at once - new ScalingExecutorBuilder(CLIENT, 1, 1, TimeValue.ZERO, CLIENT) + new ScalingExecutorBuilder(CLIENT, 1, 1, TimeValue.ZERO, true, CLIENT) ); private final AtomicBoolean shouldStop = new AtomicBoolean(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java index d770e5547b604..192769591aded 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java @@ -8,13 +8,10 @@ package org.elasticsearch.common.util.concurrent; -import org.elasticsearch.common.metrics.CounterMetric; - import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; -public class EsAbortPolicy implements XRejectedExecutionHandler { - private final CounterMetric rejected = new CounterMetric(); +public class EsAbortPolicy extends EsRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { @@ -33,12 +30,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { return; } } - rejected.inc(); - throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); - } - - @Override - public long rejected() { - return rejected.count(); + incrementRejections(); + throw newRejectedException(r, executor, executor.isShutdown()); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index f5ce1f9898e2c..69c0b06d04cf7 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -116,6 +116,7 @@ public static EsThreadPoolExecutor newScaling( int max, long keepAliveTime, TimeUnit unit, + boolean rejectAfterShutdown, ThreadFactory threadFactory, ThreadContext contextHolder ) { @@ -128,7 +129,7 @@ public static EsThreadPoolExecutor newScaling( unit, queue, threadFactory, - new ForceQueuePolicy(), + new ForceQueuePolicy(rejectAfterShutdown), contextHolder ); queue.executor = executor; @@ -380,25 +381,57 @@ public boolean offer(E e) { * A handler for rejected tasks that adds the specified element to this queue, * waiting if necessary for space to become available. */ - static class ForceQueuePolicy implements XRejectedExecutionHandler { + static class ForceQueuePolicy extends EsRejectedExecutionHandler { + + /** + * This flag is used to indicate if {@link Runnable} should be rejected once the thread pool is shutting down, ie once + * {@link ThreadPoolExecutor#shutdown()} has been called. Scaling thread pools are expected to always handle tasks rejections, even + * after shutdown or termination, but it's not the case of all existing thread pools so this flag allows to keep the previous + * behavior. + */ + private final boolean rejectAfterShutdown; + + /** + * @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down + */ + ForceQueuePolicy(boolean rejectAfterShutdown) { + this.rejectAfterShutdown = rejectAfterShutdown; + } @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + if (rejectAfterShutdown) { + if (executor.isShutdown()) { + reject(executor, task); + } else { + put(executor, task); + // we need to check again the executor state as it might have been concurrently shut down; in this case + // the executor's workers are shutting down and might have already picked up the task for execution. + if (executor.isShutdown() && executor.remove(task)) { + reject(executor, task); + } + } + } else { + put(executor, task); + } + } + + private void put(ThreadPoolExecutor executor, Runnable task) { + final BlockingQueue queue = executor.getQueue(); + // force queue policy should only be used with a scaling queue + assert queue instanceof ExecutorScalingQueue; try { - // force queue policy should only be used with a scaling queue - assert executor.getQueue() instanceof ExecutorScalingQueue; - executor.getQueue().put(r); + queue.put(task); } catch (final InterruptedException e) { - // a scaling queue never blocks so a put to it can never be interrupted + assert false : "a scaling queue never blocks so a put to it can never be interrupted"; throw new AssertionError(e); } } - @Override - public long rejected() { - return 0; + private void reject(ThreadPoolExecutor executor, Runnable task) { + incrementRejections(); + throw newRejectedException(task, executor, true); } - } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionHandler.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionHandler.java new file mode 100644 index 0000000000000..7ef75e4fee41a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.metrics.CounterMetric; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +public abstract class EsRejectedExecutionHandler implements RejectedExecutionHandler { + + private final CounterMetric rejected = new CounterMetric(); + + /** + * The number of rejected executions. + */ + public long rejected() { + return rejected.count(); + } + + protected void incrementRejections() { + rejected.inc(); + } + + protected final EsRejectedExecutionException newRejectedException(Runnable r, ThreadPoolExecutor executor, boolean isExecutorShutdown) { + final StringBuilder builder = new StringBuilder("rejected execution of ").append(r).append(" on ").append(executor); + if (isExecutorShutdown) { + builder.append(" (shutdown)"); + } + return new EsRejectedExecutionException(builder.toString(), isExecutorShutdown); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index aa42f2deed7ac..ffe9bb5cd445c 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -11,6 +11,7 @@ import org.elasticsearch.core.SuppressForbidden; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -56,7 +57,7 @@ final String getName() { TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, - XRejectedExecutionHandler handler, + RejectedExecutionHandler handler, ThreadContext contextHolder ) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index c6edcd7abd1bc..648b367e92a0a 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -60,7 +60,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto final int tasksPerFrame, TimeValue targetedResponseTime, ThreadFactory threadFactory, - XRejectedExecutionHandler handler, + EsRejectedExecutionHandler handler, ThreadContext contextHolder ) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java deleted file mode 100644 index a2051ac52011a..0000000000000 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.common.util.concurrent; - -import java.util.concurrent.RejectedExecutionHandler; - -public interface XRejectedExecutionHandler extends RejectedExecutionHandler { - - /** - * The number of rejected executions. - */ - long rejected(); -} diff --git a/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java b/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java index 34fec57529c6f..3e569568c269b 100644 --- a/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java +++ b/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java @@ -215,6 +215,7 @@ protected void doStart() { concurrentConnects, 60, TimeUnit.SECONDS, + false, threadFactory, transportService.getThreadPool().getThreadContext() ) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 28830fd06c120..e6a839125ab58 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -135,6 +135,7 @@ public UnicastZenPing( concurrentConnects, 60, TimeUnit.SECONDS, + false, threadFactory, threadPool.getThreadContext() ); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 860cc1f2536d0..8bbd587615a29 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -365,6 +365,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon 1, 0, TimeUnit.MILLISECONDS, + true, daemonThreadFactory(nodeName, DANGLING_INDICES_UPDATE_THREAD_NAME), threadPool.getThreadContext() ) diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index 87c1ba4c3618a..974a255c29487 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -70,6 +70,7 @@ public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, Ind 1, 0, TimeUnit.MILLISECONDS, + true, daemonThreadFactory(nodeName, threadName), threadPool.getThreadContext() ); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java index 69e9978d2c891..07504bc5f9d2e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java @@ -30,6 +30,7 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder coreSetting; private final Setting maxSetting; private final Setting keepAliveSetting; + private final boolean rejectAfterShutdown; /** * Construct a scaling executor builder; the settings will have the @@ -40,9 +41,16 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder... customBui final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors); final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512); - builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); + builders.put( + Names.GENERIC, + new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false) + ); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); @@ -214,27 +217,33 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui ); builders.put( Names.MANAGEMENT, - new ScalingExecutorBuilder(Names.MANAGEMENT, 1, boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5)) + new ScalingExecutorBuilder(Names.MANAGEMENT, 1, boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5), false) ); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, true)); - builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); - builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); - builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); - builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false)); + builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5), false)); + builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false)); + builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false)); builders.put( Names.SNAPSHOT_META, - new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, Math.min(allocatedProcessors * 3, 50), TimeValue.timeValueSeconds(30L)) + new ScalingExecutorBuilder( + Names.SNAPSHOT_META, + 1, + Math.min(allocatedProcessors * 3, 50), + TimeValue.timeValueSeconds(30L), + false + ) ); builders.put( Names.FETCH_SHARD_STARTED, - new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)) + new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false) ); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put( Names.FETCH_SHARD_STORE, - new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)) + new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false) ); builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false)); builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false)); @@ -367,8 +376,8 @@ public ThreadPoolStats stats() { largest = threadPoolExecutor.getLargestPoolSize(); completed = threadPoolExecutor.getCompletedTaskCount(); RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); - if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) { - rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); + if (rejectedExecutionHandler instanceof EsRejectedExecutionHandler) { + rejected = ((EsRejectedExecutionHandler) rejectedExecutionHandler).rejected(); } } stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed)); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index de380a6bac4d5..360b7fc5afa05 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -168,6 +168,7 @@ public void testScaleUp() throws Exception { max, between(1, 100), randomTimeUnit(), + randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadContext ); @@ -209,6 +210,7 @@ public void testScaleDown() throws Exception { max, between(1, 100), TimeUnit.MILLISECONDS, + randomBoolean(), EsExecutors.daemonThreadFactory("test"), threadContext ); diff --git a/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java b/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java index f4ad27d23e5e4..4bfd8ccc4c7f4 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java @@ -93,6 +93,7 @@ public void startResolver() { 2, 60, TimeUnit.SECONDS, + false, threadFactory, threadPool.getThreadContext() ); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index 8fcb0630c67b3..e264088903d04 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -10,18 +10,32 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionHandler; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.CheckedRunnable; +import org.hamcrest.Matcher; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Function; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class ScalingThreadPoolTests extends ESThreadPoolTestCase { @@ -183,6 +197,232 @@ public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws Int })); } + public void testScalingThreadPoolRejectAfterShutdown() throws Exception { + final boolean rejectAfterShutdown = randomBoolean(); + final int min = randomIntBetween(1, 4); + final int max = randomIntBetween(min, 16); + + final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling( + getTestName().toLowerCase(Locale.ROOT), + min, + max, + randomLongBetween(0, 100), + TimeUnit.MILLISECONDS, + rejectAfterShutdown, + EsExecutors.daemonThreadFactory(getTestName().toLowerCase(Locale.ROOT)), + new ThreadContext(Settings.EMPTY) + ); + try { + final AtomicLong executed = new AtomicLong(); + final AtomicLong rejected = new AtomicLong(); + final AtomicLong failed = new AtomicLong(); + + final CountDownLatch latch = new CountDownLatch(max); + final CountDownLatch block = new CountDownLatch(1); + for (int i = 0; i < max; i++) { + execute(scalingExecutor, () -> { + try { + latch.countDown(); + block.await(); + } catch (InterruptedException e) { + fail(e.toString()); + } + }, executed, rejected, failed); + } + latch.await(); + + assertThat(scalingExecutor.getCompletedTaskCount(), equalTo(0L)); + assertThat(scalingExecutor.getActiveCount(), equalTo(max)); + assertThat(scalingExecutor.getQueue().size(), equalTo(0)); + + final int queued = randomIntBetween(1, 100); + for (int i = 0; i < queued; i++) { + execute(scalingExecutor, () -> {}, executed, rejected, failed); + } + + assertThat(scalingExecutor.getCompletedTaskCount(), equalTo(0L)); + assertThat(scalingExecutor.getActiveCount(), equalTo(max)); + assertThat(scalingExecutor.getQueue().size(), equalTo(queued)); + + scalingExecutor.shutdown(); + + final int queuedAfterShutdown = randomIntBetween(1, 100); + for (int i = 0; i < queuedAfterShutdown; i++) { + execute(scalingExecutor, () -> {}, executed, rejected, failed); + } + assertThat(scalingExecutor.getQueue().size(), rejectAfterShutdown ? equalTo(queued) : equalTo(queued + queuedAfterShutdown)); + + block.countDown(); + + assertBusy(() -> assertTrue(scalingExecutor.isTerminated())); + assertThat(scalingExecutor.getActiveCount(), equalTo(0)); + assertThat(scalingExecutor.getQueue().size(), equalTo(0)); + assertThat(failed.get(), equalTo(0L)); + + final Matcher executionsMatcher = rejectAfterShutdown + ? equalTo((long) max + queued) + : allOf(greaterThanOrEqualTo((long) max + queued), lessThanOrEqualTo((long) max + queued + queuedAfterShutdown)); + assertThat(scalingExecutor.getCompletedTaskCount(), executionsMatcher); + assertThat(executed.get(), executionsMatcher); + + final EsRejectedExecutionHandler handler = (EsRejectedExecutionHandler) scalingExecutor.getRejectedExecutionHandler(); + Matcher rejectionsMatcher = rejectAfterShutdown ? equalTo((long) queuedAfterShutdown) : equalTo(0L); + assertThat(handler.rejected(), rejectionsMatcher); + assertThat(rejected.get(), rejectionsMatcher); + + final int queuedAfterTermination = randomIntBetween(1, 100); + for (int i = 0; i < queuedAfterTermination; i++) { + execute(scalingExecutor, () -> {}, executed, rejected, failed); + } + + assertThat(scalingExecutor.getCompletedTaskCount(), executionsMatcher); + assertThat(executed.get(), executionsMatcher); + + rejectionsMatcher = rejectAfterShutdown ? equalTo((long) queuedAfterShutdown + queuedAfterTermination) : equalTo(0L); + assertThat(handler.rejected(), rejectionsMatcher); + assertThat(rejected.get(), rejectionsMatcher); + + assertThat(scalingExecutor.getQueue().size(), rejectAfterShutdown ? equalTo(0) : equalTo(queuedAfterTermination)); + assertThat(failed.get(), equalTo(0L)); + + if (rejectAfterShutdown) { + final EsRejectedExecutionException exception = expectThrows( + EsRejectedExecutionException.class, + () -> scalingExecutor.execute(() -> { throw new AssertionError("should be rejected"); }) + ); + assertThat(exception.getLocalizedMessage(), allOf(containsString("rejected execution of "), containsString("(shutdown)"))); + assertThat(exception.isExecutorShutdown(), equalTo(true)); + } + + } finally { + ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testScalingThreadPoolRejectDuringShutdown() throws Exception { + final int min = 1; + final int max = randomIntBetween(min, 3); + + final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling( + getTestName().toLowerCase(Locale.ROOT), + min, + max, + randomLongBetween(0, 100), + TimeUnit.MILLISECONDS, + true, + EsExecutors.daemonThreadFactory(getTestName().toLowerCase(Locale.ROOT)), + new ThreadContext(Settings.EMPTY) + ); + try { + final AtomicLong executed = new AtomicLong(); + final AtomicLong rejected = new AtomicLong(); + final AtomicLong failed = new AtomicLong(); + + final CountDownLatch latch = new CountDownLatch(max); + final CountDownLatch block = new CountDownLatch(1); + for (int i = 0; i < max; i++) { + execute(scalingExecutor, () -> { + try { + latch.countDown(); + block.await(); + } catch (InterruptedException e) { + fail(e.toString()); + } + }, executed, rejected, failed); + } + latch.await(); + + assertThat(scalingExecutor.getCompletedTaskCount(), equalTo(0L)); + assertThat(scalingExecutor.getActiveCount(), equalTo(max)); + assertThat(scalingExecutor.getQueue().size(), equalTo(0)); + + final CyclicBarrier barrier = new CyclicBarrier(randomIntBetween(1, 5) + 1); + final Thread[] threads = new Thread[barrier.getParties()]; + + for (int t = 0; t < barrier.getParties(); t++) { + if (t == 0) { + threads[t] = new Thread(() -> { + try { + barrier.await(); + scalingExecutor.shutdown(); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } else { + threads[t] = new Thread(() -> { + try { + barrier.await(); + execute(scalingExecutor, () -> {}, executed, rejected, failed); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + threads[t].start(); + } + block.countDown(); + + for (Thread thread : threads) { + thread.join(); + } + + assertBusy(() -> assertTrue(scalingExecutor.isTerminated())); + assertThat(scalingExecutor.getCompletedTaskCount(), greaterThanOrEqualTo((long) max)); + final long maxCompletedTasks = (long) max + barrier.getParties() - 1L; + assertThat(scalingExecutor.getCompletedTaskCount(), lessThanOrEqualTo(maxCompletedTasks)); + assertThat(scalingExecutor.getCompletedTaskCount() + rejected.get(), equalTo(maxCompletedTasks)); + assertThat(scalingExecutor.getQueue().size(), equalTo(0)); + assertThat(scalingExecutor.getActiveCount(), equalTo(0)); + + } finally { + ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS); + } + } + + private static void execute( + final Executor executor, + final CheckedRunnable runnable, + final AtomicLong executed, + final AtomicLong rejected, + final AtomicLong failed + ) { + if (randomBoolean()) { + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + runnable.run(); + executed.incrementAndGet(); + } + + @Override + public void onFailure(Exception e) { + failed.incrementAndGet(); + } + + @Override + public void onRejection(Exception e) { + rejected.incrementAndGet(); + } + }); + } else { + try { + executor.execute(() -> { + try { + runnable.run(); + executed.incrementAndGet(); + } catch (Exception e) { + failed.incrementAndGet(); + } + }); + } catch (EsRejectedExecutionException e) { + rejected.incrementAndGet(); + } catch (Exception e) { + failed.incrementAndGet(); + } + } + } + public void runScalingThreadPoolTest(final Settings settings, final BiConsumer consumer) throws InterruptedException { ThreadPool threadPool = null; diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index b8d6c1958321e..36d86ee1dedb3 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -179,12 +179,12 @@ public void testShutdownNowInterrupts() throws Exception { public void testCustomThreadPool() throws Exception { ThreadPool threadPool = null; try { - final ScalingExecutorBuilder scaling = new ScalingExecutorBuilder( "my_pool1", 1, EsExecutors.allocatedProcessors(Settings.EMPTY), - TimeValue.timeValueMinutes(1) + TimeValue.timeValueMinutes(1), + randomBoolean() ); final FixedExecutorBuilder fixed = new FixedExecutorBuilder(Settings.EMPTY, "my_pool2", 1, 1); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index aa0337505c5c2..66cc51206f7b7 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -95,6 +95,7 @@ public static void startHttpServer() throws Exception { 2, 60, TimeUnit.SECONDS, + true, threadFactory, new ThreadContext(Settings.EMPTY) ); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ded988faae762..05894076acaff 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -443,6 +443,7 @@ public InternalTestCluster( Integer.MAX_VALUE, 0, TimeUnit.SECONDS, + true, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b4aa8ee0815e8..963333bf7aed7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1291,6 +1291,7 @@ public List> getExecutorBuilders(Settings unused) { 4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), + false, "xpack.ml.job_comms_thread_pool" ); @@ -1301,6 +1302,7 @@ public List> getExecutorBuilders(Settings unused) { 1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), + false, "xpack.ml.utility_thread_pool" ); @@ -1309,6 +1311,7 @@ public List> getExecutorBuilders(Settings unused) { 1, MAX_MAX_OPEN_JOBS_PER_NODE, TimeValue.timeValueMinutes(1), + false, "xpack.ml.datafeed_thread_pool" ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java index 3f168ef8c7ee7..040c510e30a50 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java @@ -99,7 +99,14 @@ public class ModelLoadingServiceTests extends ESTestCase { public void setUpComponents() { threadPool = new TestThreadPool( "ModelLoadingServiceTests", - new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME, 1, 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool") + new ScalingExecutorBuilder( + UTILITY_THREAD_POOL_NAME, + 1, + 4, + TimeValue.timeValueMinutes(10), + false, + "xpack.ml.utility_thread_pool" + ) ); trainedModelProvider = mock(TrainedModelProvider.class); clusterService = mock(ClusterService.class); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 179e9ca019047..44181295d295c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -575,12 +575,17 @@ public Map getRecoveryStateFactories() { public static ScalingExecutorBuilder[] executorBuilders(Settings settings) { final int processors = EsExecutors.allocatedProcessors(settings); + // searchable snapshots cache thread pools should always reject tasks once they are shutting down, otherwise some threads might be + // waiting for some cache file regions to be populated but this will never happen once the thread pool is shutting down. In order to + // prevent these threads to be blocked the cache thread pools will reject after shutdown. + final boolean rejectAfterShutdown = true; return new ScalingExecutorBuilder[] { new ScalingExecutorBuilder( CACHE_FETCH_ASYNC_THREAD_POOL_NAME, 0, Math.min(processors * 3, 50), TimeValue.timeValueSeconds(30L), + rejectAfterShutdown, CACHE_FETCH_ASYNC_THREAD_POOL_SETTING ), new ScalingExecutorBuilder( @@ -588,6 +593,7 @@ public static ScalingExecutorBuilder[] executorBuilders(Settings settings) { 0, 16, TimeValue.timeValueSeconds(30L), + rejectAfterShutdown, CACHE_PREWARMING_THREAD_POOL_SETTING ) }; } diff --git a/x-pack/snapshot-tool/src/main/java/org/elasticsearch/snapshots/AbstractRepository.java b/x-pack/snapshot-tool/src/main/java/org/elasticsearch/snapshots/AbstractRepository.java index 53267beb2a7df..f4aaabc06a7f5 100644 --- a/x-pack/snapshot-tool/src/main/java/org/elasticsearch/snapshots/AbstractRepository.java +++ b/x-pack/snapshot-tool/src/main/java/org/elasticsearch/snapshots/AbstractRepository.java @@ -211,6 +211,7 @@ public void cleanup() throws IOException { parallelism, 10L, TimeUnit.SECONDS, + true, EsExecutors.daemonThreadFactory("snapshot_cleanup_tool"), new ThreadContext(Settings.EMPTY) );