From 34b1b0b14891698a5cf7b285622f541ca8aebc35 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 23 May 2018 18:35:25 +0200 Subject: [PATCH] Use correct cluster state version for node fault detection (#30810) Since its introduction in ES 1.4, node fault detection has been using the wrong cluster state version to send as part of the ping request, by using always the constant -1 (ClusterState.UNKNOWN_VERSION). This can, in an unfortunate series of events, lead to a situation where a previous stale master can regain its authority and revert the cluster to an older state. This commit makes NodesFaultDetection use the correct current cluster state for sending ping requests, avoiding the situation where a stale master possibly forces a newer master to step down and rejoin the stale one. --- .../discovery/zen/NodesFaultDetection.java | 17 ++++++++++++----- .../discovery/zen/ZenDiscovery.java | 2 +- .../discovery/ZenFaultDetectionTests.java | 8 +++++--- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 218e6e3f63f95..d19cc98441b79 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -44,6 +44,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Supplier; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; @@ -66,13 +67,16 @@ public void onPingReceived(PingRequest pingRequest) {} private final ConcurrentMap nodesFD = newConcurrentMap(); - private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION; + private final Supplier clusterStateSupplier; private volatile DiscoveryNode localNode; - public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { + public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, + Supplier clusterStateSupplier, ClusterName clusterName) { super(settings, threadPool, transportService, clusterName); + this.clusterStateSupplier = clusterStateSupplier; + logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); @@ -208,15 +212,18 @@ private boolean running() { return NodeFD.this.equals(nodesFD.get(node)); } + private PingRequest newPingRequest() { + return new PingRequest(node, clusterName, localNode, clusterStateSupplier.get().version()); + } + @Override public void run() { if (!running()) { return; } - final PingRequest pingRequest = new PingRequest(node, clusterName, localNode, clusterStateVersion); final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING) .withTimeout(pingRetryTimeout).build(); - transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new TransportResponseHandler() { + transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler() { @Override public PingResponse newInstance() { return new PingResponse(); @@ -254,7 +261,7 @@ public void handleException(TransportException exp) { } } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this); + transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7d8485fee09d4..0acfa194d55e4 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -205,7 +205,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName); this.masterFD.addListener(new MasterNodeFailureListener()); - this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); + this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName); this.nodesFD.addListener(new NodeFaultDetectionListener()); this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings)); diff --git a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index f32e93bb82dbd..03c0df43591ba 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -175,17 +175,19 @@ public void testNodesFaultDetectionConnectOnDisconnect() throws InterruptedExcep final Settings pingSettings = Settings.builder() .put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry) .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m").build(); - ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(buildNodesForA(true)).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()) + .nodes(buildNodesForA(true)).build(); NodesFaultDetection nodesFDA = new NodesFaultDetection(Settings.builder().put(settingsA).put(pingSettings).build(), - threadPool, serviceA, clusterState.getClusterName()); + threadPool, serviceA, () -> clusterState, clusterState.getClusterName()); nodesFDA.setLocalNode(nodeA); NodesFaultDetection nodesFDB = new NodesFaultDetection(Settings.builder().put(settingsB).put(pingSettings).build(), - threadPool, serviceB, clusterState.getClusterName()); + threadPool, serviceB, () -> clusterState, clusterState.getClusterName()); nodesFDB.setLocalNode(nodeB); final CountDownLatch pingSent = new CountDownLatch(1); nodesFDB.addListener(new NodesFaultDetection.Listener() { @Override public void onPingReceived(NodesFaultDetection.PingRequest pingRequest) { + assertThat(pingRequest.clusterStateVersion(), equalTo(clusterState.version())); pingSent.countDown(); } });