From 9a133e9348b63db87ba3ab01e0239625de43d29b Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 24 Jun 2018 20:23:15 +0100 Subject: [PATCH] Introduce CONNECT threadpool Today we attempt to (re-)connect to our peers using the management threadpool. However, during a network partition there may sometimes be a large number of concurrent connection attempts. Connection attempts to partitioned nodes or to nodes in containers that are no longer running can hang until they timeout, possibly blocking other reconnection attempts and other management activity for an extended period of time. Moreover, connecting to a peer is a relatively lightweight operation so it is reasonable to attempt a lot of them in parallel. This change introduces a separate threadpool solely for connecting to peers. Fixes #29023. --- docs/reference/cat/thread_pool.asciidoc | 2 ++ docs/reference/modules/threadpool.asciidoc | 4 ++++ .../elasticsearch/cluster/NodeConnectionsService.java | 6 +++--- .../java/org/elasticsearch/threadpool/ThreadPool.java | 4 ++++ .../threadpool/UpdateThreadPoolSettingsTests.java | 10 +++++++++- 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index 9528b7829e37f..5db646cd17424 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -15,6 +15,7 @@ Which looks like: [source,txt] -------------------------------------------------- node-0 analyze 0 0 0 +node-0 connect 0 0 0 node-0 fetch_shard_started 0 0 0 node-0 fetch_shard_store 0 0 0 node-0 flush 0 0 0 @@ -45,6 +46,7 @@ The second column is the thread pool name -------------------------------------------------- name analyze +connect fetch_shard_started fetch_shard_store flush diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 515959e4ea580..6db89d4f1d456 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -48,6 +48,10 @@ There are several thread pools, but the important ones include: Mainly for java client executing of action when listener threaded is set to true. Thread pool type is `scaling` with a default max of `min(10, (# of available processors)/2)`. +`connect`:: + For connecting to other nodes in the cluster. Thread pool type is `scaling` with a + keep-alive of `10s` and a max of `min(100, (# of available processors)*10)`. + Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `bulk` thread pool to have more threads: diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 998cd5ba0a870..3af6f762a8214 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -90,7 +90,7 @@ public void connectToNodes(DiscoveryNodes discoveryNodes) { latch.countDown(); } else { // spawn to another thread to do in parallel - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + threadPool.executor(ThreadPool.Names.CONNECT).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { // both errors and rejections are logged here. the service @@ -185,14 +185,14 @@ protected void doRun() { @Override public void onAfter() { if (lifecycle.started()) { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); + backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.CONNECT, this); } } } @Override protected void doStart() { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker()); + backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.CONNECT, new ConnectionChecker()); } @Override diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 51a4adec8d16d..76499df706154 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -79,6 +79,7 @@ public static class Names { public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; + public static final String CONNECT = "connect"; } public enum ThreadPoolType { @@ -135,6 +136,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); + map.put(Names.CONNECT, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -186,6 +188,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); + builders.put(Names.CONNECT, new ScalingExecutorBuilder(Names.CONNECT, 1, + boundedBy(10 * availableProcessors, 10, 100), TimeValue.timeValueSeconds(10))); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index ea281f7d9ae1e..2b57480db901f 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -133,7 +133,7 @@ public void testScalingExecutorType() throws InterruptedException { final int expectedMinimum = "generic".equals(threadPoolName) ? 4 : 1; assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedMinimum)); assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); - final long expectedKeepAlive = "generic".equals(threadPoolName) ? 30 : 300; + final long expectedKeepAlive = expectedKeepAlive(threadPoolName); assertThat(info(threadPool, threadPoolName).getKeepAlive().seconds(), equalTo(expectedKeepAlive)); assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); @@ -142,6 +142,14 @@ public void testScalingExecutorType() throws InterruptedException { } } + private long expectedKeepAlive(String threadPoolName) { + switch (threadPoolName) { + case "generic": return 30; + case "connect": return 10; + default: return 300; + } + } + public void testShutdownNowInterrupts() throws Exception { String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); ThreadPool threadPool = null;