diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index 77d321ae9ed3d..cb3823adf08fb 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -110,6 +110,17 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil currentListener.addListener(listener); + // It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to + // connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant + // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending. + if (connectedNodes.containsKey(node)) { + ListenableFuture future = pendingConnections.remove(node); + assert future == currentListener : "Listener in pending map is different than the expected listener"; + connectingRefCounter.decRef(); + future.onResponse(null); + return; + } + final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( @@ -117,6 +128,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil assert Transports.assertNotTransportThread("connection validator success"); try { if (connectedNodes.putIfAbsent(node, conn) != null) { + assert false : "redundant conection to " + node; logger.debug("existing connection to node [{}], closing new redundant connection", node); IOUtils.closeWhileHandlingException(conn); } else {