Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid redundant connections in ClusterConnectionManager #77196

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,25 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil

currentListener.addListener(listener);

// It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to
// connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant
// extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
if (connectedNodes.containsKey(node)) {
ListenableFuture<Void> future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
connectingRefCounter.decRef();
future.onResponse(null);
return;
}

final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(
ignored -> {
assert Transports.assertNotTransportThread("connection validator success");
try {
if (connectedNodes.putIfAbsent(node, conn) != null) {
assert false : "redundant conection to " + node;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterConnectionManagerTests#testConcurrentConnects exercises this - I can make it fail reasonably often by adding a few Thread.sleep calls:

diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
index 415769596a4..00bb8423d7b 100644
--- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
+++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
@@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -109,6 +110,12 @@ public class ClusterConnectionManager implements ConnectionManager {
             return;
         }

+        try {
+            Thread.sleep(Randomness.get().nextInt(50));
+        } catch (InterruptedException e) {
+            throw new AssertionError("unexpected", e);
+        }
+
         final ListenableFuture<Void> currentListener = new ListenableFuture<>();
         final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
         if (existingListener != null) {
@@ -126,7 +133,7 @@ public class ClusterConnectionManager implements ConnectionManager {
         // It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to
         // connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant
         // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
-        if (connectedNodes.containsKey(node)) {
+        if (connectedNodes.containsKey(node) && false) {
             ListenableFuture<Void> future = pendingConnections.remove(node);
             assert future == currentListener : "Listener in pending map is different than the expected listener";
             connectingRefCounter.decRef();
diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
index f44217e276a..2b11867d21f 100644
--- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
+++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
@@ -163,6 +163,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
             Thread thread = new Thread(() -> {
                 try {
                     barrier.await();
+                    Thread.sleep(between(0, 50));
                 } catch (InterruptedException | BrokenBarrierException e) {
                     throw new RuntimeException(e);
                 }

logger.debug("existing connection to node [{}], closing new redundant connection", node);
IOUtils.closeWhileHandlingException(conn);
} else {
Expand Down