diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index 9528b7829e37f..a5462fc88887a 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -56,6 +56,7 @@ management ml_autodetect (default distro only) ml_datafeed (default distro only) ml_utility (default distro only) +node_connections refresh rollup_indexing (default distro only)` search diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 515959e4ea580..aac6650374029 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -48,6 +48,11 @@ There are several thread pools, but the important ones include: Mainly for java client executing of action when listener threaded is set to true. Thread pool type is `scaling` with a default max of `min(10, (# of available processors)/2)`. +`node_connections`:: + For connecting to other nodes in the cluster. Thread pool type is `scaling` with a + keep-alive of `5m` and a max of `(# of available processors)*2` by default. For larger clusters, + you may need to manually increase the `max` size to adapt to a higher node connection intensity. + Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `bulk` thread pool to have more threads: diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 998cd5ba0a870..4f43ae2210908 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -89,13 +89,12 @@ public void connectToNodes(DiscoveryNodes discoveryNodes) { if (connected) { latch.countDown(); } else { - // spawn to another thread to do in parallel - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + threadPool.executor(ThreadPool.Names.NODE_CONNECTIONS).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { // both errors and rejections are logged here. the service // will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master. - // On the master, node fault detection will remove these nodes from the cluster as their are not + // On the master, node fault detection will remove these nodes from the cluster as they are not // connected. Note that it is very rare that we end up here on the master. logger.warn(() -> new ParameterizedMessage("failed to connect to {}", node), e); } @@ -185,14 +184,14 @@ protected void doRun() { @Override public void onAfter() { if (lifecycle.started()) { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); + backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.NODE_CONNECTIONS, this); } } } @Override protected void doStart() { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker()); + backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.NODE_CONNECTIONS, new ConnectionChecker()); } @Override diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 51a4adec8d16d..bacb039d05a2a 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Counter; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -38,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.node.Node; import java.io.Closeable; @@ -79,6 +79,7 @@ public static class Names { public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; + public static final String NODE_CONNECTIONS = "node_connections"; } public enum ThreadPoolType { @@ -135,6 +136,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); + map.put(Names.NODE_CONNECTIONS, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -186,6 +188,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); + builders.put(Names.NODE_CONNECTIONS, new ScalingExecutorBuilder(Names.NODE_CONNECTIONS, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 828b385f85fa5..f14606125dde5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.Transport; @@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class NodeConnectionsServiceTests extends ESTestCase { @@ -103,6 +105,8 @@ public void testConnectAndDisconnect() { service.disconnectFromNodesExcept(event.state().nodes()); assertConnectedExactlyToNodes(event.state()); + + assertUsingNodeConnectionThreadPool(threadPool); } @@ -129,6 +133,8 @@ public void testReconnect() { transport.randomConnectionExceptions = false; service.new ConnectionChecker().run(); assertConnectedExactlyToNodes(event.state()); + + assertUsingNodeConnectionThreadPool(threadPool); } private void assertConnectedExactlyToNodes(ClusterState state) { @@ -148,6 +154,19 @@ private void assertNotConnected(Iterable nodes) { } } + /** + * Assert only {@link ThreadPool.Names#NODE_CONNECTIONS} threadpool has been used for node connection. + */ + private void assertUsingNodeConnectionThreadPool(ThreadPool threadPool) { + for (ThreadPoolStats.Stats stats : threadPool.stats()) { + if (stats.getName().equals(ThreadPool.Names.NODE_CONNECTIONS)) { + assertThat((int) stats.getCompleted(), greaterThan(0)); + } else { + assertThat((int) stats.getCompleted(), equalTo(0)); + } + } + } + @Override @Before public void setUp() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index a9436053ae902..485308853d71b 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -105,6 +105,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfNumberOfProcessorsMaxFive); sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceNumberOfProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceNumberOfProcessors); + sizes.put(ThreadPool.Names.NODE_CONNECTIONS, ThreadPool::twiceNumberOfProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); }