-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid redundant connections in ClusterConnectionManager #77196
Avoid redundant connections in ClusterConnectionManager #77196
Conversation
Today it's possible to open two connections to a node, and then we notice when registering the second connection and close it instead. Fixing elastic#67873 will require us to keep tighter control over the identity and lifecycle of each connection, and opening redundant connections gets in the way of this. This commit adds a check for an existing connection _after_ marking the connection as pending, which guarantees that we don't open those redundant connections.
Pinging @elastic/es-distributed (Team:Distributed) |
final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); | ||
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { | ||
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( | ||
ignored -> { | ||
assert Transports.assertNotTransportThread("connection validator success"); | ||
try { | ||
if (connectedNodes.putIfAbsent(node, conn) != null) { | ||
assert false : "redundant conection to " + node; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClusterConnectionManagerTests#testConcurrentConnects
exercises this - I can make it fail reasonably often by adding a few Thread.sleep
calls:
diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
index 415769596a4..00bb8423d7b 100644
--- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
+++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
@@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -109,6 +110,12 @@ public class ClusterConnectionManager implements ConnectionManager {
return;
}
+ try {
+ Thread.sleep(Randomness.get().nextInt(50));
+ } catch (InterruptedException e) {
+ throw new AssertionError("unexpected", e);
+ }
+
final ListenableFuture<Void> currentListener = new ListenableFuture<>();
final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
if (existingListener != null) {
@@ -126,7 +133,7 @@ public class ClusterConnectionManager implements ConnectionManager {
// It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to
// connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant
// extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
- if (connectedNodes.containsKey(node)) {
+ if (connectedNodes.containsKey(node) && false) {
ListenableFuture<Void> future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
connectingRefCounter.decRef();
diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
index f44217e276a..2b11867d21f 100644
--- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
+++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
@@ -163,6 +163,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
Thread thread = new Thread(() -> {
try {
barrier.await();
+ Thread.sleep(between(0, 50));
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice find :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks both :) |
Today it's possible to open two connections to a node, and then we notice when registering the second connection and close it instead. Fixing #67873 will require us to keep tighter control over the identity and lifecycle of each connection, and opening redundant connections gets in the way of this. This commit adds a check for an existing connection _after_ marking the connection as pending, which guarantees that we don't open those redundant connections.
It' necessary to add the second |
The comment added in this PR notes that it is indeed not needed for correctness, but explains why I left it in anyway: elasticsearch/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java Line 115 in 0cc9778
We're almost always going to be re-using an existing connection, no need to faff around with |
Today it's possible to open two connections to a node, and then we
notice when registering the second connection and close it instead.
Fixing #67873 will require us to keep tighter control over the identity
and lifecycle of each connection, and opening redundant connections gets
in the way of this. This commit adds a check for an existing connection
after marking the connection as pending, which guarantees that we
don't open those redundant connections.