diff --git a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index bda1481130cdd..aab75eb2aad7b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; @@ -37,9 +38,9 @@ import org.elasticsearch.transport.TransportService; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.common.settings.Setting.Property; @@ -78,21 +79,53 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); } - public void connectToNodes(Iterable discoveryNodes) { - - // TODO: do this in parallel (and wait) + public void connectToNodes(DiscoveryNodes discoveryNodes) { + CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize()); for (final DiscoveryNode node : discoveryNodes) { + final boolean connected; try (Releasable ignored = nodeLocks.acquire(node)) { nodes.putIfAbsent(node, 0); - validateNodeConnected(node); + connected = transportService.nodeConnected(node); + } + if (connected) { + latch.countDown(); + } else { + // spawn to another thread to do in parallel + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + // both errors and rejections are logged here. the service + // will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master. + // On the master, node fault detection will remove these nodes from the cluster as their are not + // connected. Note that it is very rare that we end up here on the master. + logger.warn((Supplier) () -> new ParameterizedMessage("failed to connect to {}", node), e); + } + + @Override + protected void doRun() throws Exception { + try (Releasable ignored = nodeLocks.acquire(node)) { + validateAndConnectIfNeeded(node); + } + } + + @Override + public void onAfter() { + latch.countDown(); + } + }); } } + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } /** * Disconnects from all nodes except the ones provided as parameter */ - public void disconnectFromNodesExcept(Iterable nodesToKeep) { + public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { Set currentNodes = new HashSet<>(nodes.keySet()); for (DiscoveryNode node : nodesToKeep) { currentNodes.remove(node); @@ -110,8 +143,8 @@ public void disconnectFromNodesExcept(Iterable nodesToKeep) { } } - void validateNodeConnected(DiscoveryNode node) { - assert nodeLocks.isHeldByCurrentThread(node) : "validateNodeConnected must be called under lock"; + void validateAndConnectIfNeeded(DiscoveryNode node) { + assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock"; if (lifecycle.stoppedOrClosed() || nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time... // nothing to do @@ -147,7 +180,7 @@ public void onFailure(Exception e) { protected void doRun() { for (DiscoveryNode node : nodes.keySet()) { try (Releasable ignored = nodeLocks.acquire(node)) { - validateNodeConnected(node); + validateAndConnectIfNeeded(node); } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index a7cf75e6eb6a8..6efed2638b277 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; @@ -40,7 +41,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; @@ -59,7 +59,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { - private static ThreadPool THREAD_POOL; + private ThreadPool threadPool; private MockTransport transport; private TransportService transportService; @@ -83,7 +83,7 @@ private ClusterState clusterStateFromNodes(List nodes) { public void testConnectAndDisconnect() { List nodes = generateNodes(); - NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService); + NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); ClusterState current = clusterStateFromNodes(Collections.emptyList()); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); @@ -107,14 +107,14 @@ public void testConnectAndDisconnect() { public void testReconnect() { List nodes = generateNodes(); - NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService); + NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); ClusterState current = clusterStateFromNodes(Collections.emptyList()); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); transport.randomConnectionExceptions = true; - service.connectToNodes(event.nodesDelta().addedNodes()); + service.connectToNodes(event.state().nodes()); for (int i = 0; i < 3; i++) { // simulate disconnects @@ -151,8 +151,9 @@ private void assertNotConnected(Iterable nodes) { @Before public void setUp() throws Exception { super.setUp(); + this.threadPool = new TestThreadPool(getClass().getName()); this.transport = new MockTransport(); - transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null); transportService.start(); transportService.acceptIncomingRequests(); @@ -162,16 +163,11 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { transportService.stop(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; super.tearDown(); } - @AfterClass - public static void stopThreadPool() { - ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); - THREAD_POOL = null; - } - - final class MockTransport implements Transport { private final AtomicLong requestId = new AtomicLong(); Set connectedNodes = ConcurrentCollections.newConcurrentSet(); diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index 6881539939849..809ebdfe17ef7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -126,12 +126,12 @@ TimedClusterService createTimedClusterService(boolean makeMaster) throws Interru emptySet(), Version.CURRENT)); timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToNodes(Iterable discoveryNodes) { + public void connectToNodes(DiscoveryNodes discoveryNodes) { // skip } @Override - public void disconnectFromNodesExcept(Iterable nodesToKeep) { + public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { // skip } }); @@ -1059,12 +1059,12 @@ public void testDisconnectFromNewlyAddedNodesIfClusterStatePublishingFails() thr Set currentNodes = new HashSet<>(); timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToNodes(Iterable discoveryNodes) { + public void connectToNodes(DiscoveryNodes discoveryNodes) { discoveryNodes.forEach(currentNodes::add); } @Override - public void disconnectFromNodesExcept(Iterable nodesToKeep) { + public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { Set nodeSet = new HashSet<>(); nodesToKeep.iterator().forEachRemaining(nodeSet::add); currentNodes.removeIf(node -> nodeSet.contains(node) == false); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index b5c77a71aaf87..07e7a25324b9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -53,12 +53,12 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove threadPool, () -> localNode); clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToNodes(Iterable discoveryNodes) { + public void connectToNodes(DiscoveryNodes discoveryNodes) { // skip } @Override - public void disconnectFromNodesExcept(Iterable nodesToKeep) { + public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { // skip } });