Skip to content

Commit

Permalink
Connect to new nodes concurrently (#22984)
Browse files Browse the repository at this point in the history
When a node receives a new cluster state from the master, it opens up connections to any new node in the cluster state. That has always been done serially on the cluster state thread but it has been a long standing TODO to do this concurrently, which is done by this PR.

This is spin off of #22828, where an extra handshake is done whenever connecting to a node, which may slow down connecting. Also, the handshake is done in a blocking fashion which triggers assertions w.r.t blocking requests on the cluster state thread. Instead of adding an exception, I opted to implement concurrent connections which both side steps the assertion and compensates for the extra handshake.
  • Loading branch information
bleskes committed Feb 9, 2017
1 parent 360e76d commit 43e2382
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -37,9 +38,9 @@
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;

import static org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -78,21 +79,53 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}

public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {

// TODO: do this in parallel (and wait)
public void connectToNodes(DiscoveryNodes discoveryNodes) {
CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
for (final DiscoveryNode node : discoveryNodes) {
final boolean connected;
try (Releasable ignored = nodeLocks.acquire(node)) {
nodes.putIfAbsent(node, 0);
validateNodeConnected(node);
connected = transportService.nodeConnected(node);
}
if (connected) {
latch.countDown();
} else {
// spawn to another thread to do in parallel
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// both errors and rejections are logged here. the service
// will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master.
// On the master, node fault detection will remove these nodes from the cluster as their are not
// connected. Note that it is very rare that we end up here on the master.
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to connect to {}", node), e);
}

@Override
protected void doRun() throws Exception {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateAndConnectIfNeeded(node);
}
}

@Override
public void onAfter() {
latch.countDown();
}
});
}
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* Disconnects from all nodes except the ones provided as parameter
*/
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
Set<DiscoveryNode> currentNodes = new HashSet<>(nodes.keySet());
for (DiscoveryNode node : nodesToKeep) {
currentNodes.remove(node);
Expand All @@ -110,8 +143,8 @@ public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
}
}

void validateNodeConnected(DiscoveryNode node) {
assert nodeLocks.isHeldByCurrentThread(node) : "validateNodeConnected must be called under lock";
void validateAndConnectIfNeeded(DiscoveryNode node) {
assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock";
if (lifecycle.stoppedOrClosed() ||
nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time...
// nothing to do
Expand Down Expand Up @@ -147,7 +180,7 @@ public void onFailure(Exception e) {
protected void doRun() {
for (DiscoveryNode node : nodes.keySet()) {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateNodeConnected(node);
validateAndConnectIfNeeded(node);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
Expand All @@ -41,7 +42,6 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -60,7 +60,7 @@

public class NodeConnectionsServiceTests extends ESTestCase {

private static ThreadPool THREAD_POOL;
private ThreadPool threadPool;
private MockTransport transport;
private TransportService transportService;

Expand All @@ -84,7 +84,7 @@ private ClusterState clusterStateFromNodes(List<DiscoveryNode> nodes) {

public void testConnectAndDisconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService);
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);

ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
Expand All @@ -108,14 +108,14 @@ public void testConnectAndDisconnect() {

public void testReconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService);
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);

ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);

transport.randomConnectionExceptions = true;

service.connectToNodes(event.nodesDelta().addedNodes());
service.connectToNodes(event.state().nodes());

for (int i = 0; i < 3; i++) {
// simulate disconnects
Expand Down Expand Up @@ -152,8 +152,9 @@ private void assertNotConnected(Iterable<DiscoveryNode> nodes) {
@Before
public void setUp() throws Exception {
super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport();
transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, LocalTransportAddress.buildUnique(), UUIDs.randomBase64UUID()), null);
transportService.start();
transportService.acceptIncomingRequests();
Expand All @@ -163,16 +164,11 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
transportService.stop();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
super.tearDown();
}

@AfterClass
public static void stopThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
THREAD_POOL = null;
}


final class MockTransport implements Transport {
private final AtomicLong requestId = new AtomicLong();
Set<DiscoveryNode> connectedNodes = ConcurrentCollections.newConcurrentSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ TimedClusterService createTimedClusterService(boolean makeMaster) throws Interru
threadPool, () -> new DiscoveryNode("node1", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT));
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip
}

@Override
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip
}
});
Expand Down Expand Up @@ -1058,12 +1058,12 @@ public void testDisconnectFromNewlyAddedNodesIfClusterStatePublishingFails() thr
Set<DiscoveryNode> currentNodes = new HashSet<>();
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
public void connectToNodes(DiscoveryNodes discoveryNodes) {
discoveryNodes.forEach(currentNodes::add);
}

@Override
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
Set<DiscoveryNode> nodeSet = new HashSet<>();
nodesToKeep.iterator().forEachRemaining(nodeSet::add);
currentNodes.removeIf(node -> nodeSet.contains(node) == false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static junit.framework.TestCase.fail;
Expand All @@ -54,12 +53,12 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove
threadPool, () -> localNode);
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip
}

@Override
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip
}
});
Expand Down

0 comments on commit 43e2382

Please sign in to comment.