From 8cdb9c53b1800995c7abebfb3c9bf8b375077639 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 2 Sep 2021 14:51:10 +0100 Subject: [PATCH] Avoid redundant connections in ClusterConnectionManager Today it's possible to open two connections to a node, and then we notice when registering the second connection and close it instead. Fixing #67873 will require us to keep tighter control over the identity and lifecycle of each connection, and opening redundant connections gets in the way of this. This commit adds a check for an existing connection _after_ marking the connection as pending, which guarantees that we don't open those redundant connections. --- .../transport/ClusterConnectionManager.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index 4b5da7dbbefde..415769596a400 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -123,6 +123,17 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil currentListener.addListener(listener); + // It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to + // connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant + // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending. + if (connectedNodes.containsKey(node)) { + ListenableFuture future = pendingConnections.remove(node); + assert future == currentListener : "Listener in pending map is different than the expected listener"; + connectingRefCounter.decRef(); + future.onResponse(null); + return; + } + final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( @@ -130,6 +141,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil assert Transports.assertNotTransportThread("connection validator success"); try { if (connectedNodes.putIfAbsent(node, conn) != null) { + assert false : "redundant conection to " + node; logger.debug("existing connection to node [{}], closing new redundant connection", node); IOUtils.closeWhileHandlingException(conn); } else {