-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polling cluster formation state for master-is-stable health indicator #88397
Changes from 16 commits
85eaacf
85323a4
dc3b581
5d215f3
9cff8c7
6eabf6d
3e38a50
7ee8a03
6605e87
c5e7260
27ebfd7
e14af5b
919b5f7
630a092
f407efc
be04f87
f760226
4c127d9
0168cfb
a282a0d
b7937a9
9ef64e5
69099bb
963279f
c86f450
fb77e27
8cea8a5
0144e60
e7e8949
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 88397 | ||
summary: Polling cluster formation state for master-is-stable health indicator | ||
area: Health | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,11 @@ | |
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionListenerResponseHandler; | ||
import org.elasticsearch.action.StepListener; | ||
import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterStateListener; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
|
@@ -19,7 +24,14 @@ | |
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.core.Releasable; | ||
import org.elasticsearch.core.Releasables; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.threadpool.Scheduler; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.ConnectionProfile; | ||
import org.elasticsearch.transport.TransportRequestOptions; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.io.PrintWriter; | ||
|
@@ -30,6 +42,9 @@ | |
import java.util.Locale; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
|
||
|
@@ -47,6 +62,7 @@ | |
*/ | ||
public class CoordinationDiagnosticsService implements ClusterStateListener { | ||
private final ClusterService clusterService; | ||
private final TransportService transportService; | ||
private final Coordinator coordinator; | ||
private final MasterHistoryService masterHistoryService; | ||
/** | ||
|
@@ -63,6 +79,19 @@ public class CoordinationDiagnosticsService implements ClusterStateListener { | |
*/ | ||
private final int unacceptableIdentityChanges; | ||
|
||
/* | ||
* This is a list of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for | ||
* diagnosis. | ||
* This field is only ever accessed on the cluster change event thread, so there no need to protect it for thread safety. | ||
*/ | ||
private List<Scheduler.Cancellable> clusterFormationInfoTasks = List.of(); | ||
/* | ||
* This field holds the results of the tasks in the clusterFormationInfoTasks field above. The field is accessed (reads/writes) from | ||
* multiple threads, but the reference itself is only ever changed on the cluster change event thread. | ||
*/ | ||
// Non-private for testing | ||
volatile ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> clusterFormationResponses = new ConcurrentHashMap<>(); | ||
|
||
private static final Logger logger = LogManager.getLogger(CoordinationDiagnosticsService.class); | ||
|
||
/** | ||
|
@@ -98,10 +127,12 @@ public class CoordinationDiagnosticsService implements ClusterStateListener { | |
|
||
public CoordinationDiagnosticsService( | ||
ClusterService clusterService, | ||
TransportService transportService, | ||
Coordinator coordinator, | ||
MasterHistoryService masterHistoryService | ||
) { | ||
this.clusterService = clusterService; | ||
this.transportService = transportService; | ||
this.coordinator = coordinator; | ||
this.masterHistoryService = masterHistoryService; | ||
this.nodeHasMasterLookupTimeframe = NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING.get(clusterService.getSettings()); | ||
|
@@ -410,6 +441,204 @@ public void clusterChanged(ClusterChangedEvent event) { | |
} | ||
} | ||
} | ||
if (currentMaster == null && clusterService.localNode().isMasterNode()) { | ||
/* | ||
* This begins polling all master-eligible nodes for cluster formation information. However there's a 10-second delay before it | ||
* starts, so in the normal situation where during a master transition it flips from master1 -> null -> master2, it the | ||
* polling tasks will be canceled before any requests are actually made. | ||
*/ | ||
beginPollingClusterFormationInfo(); | ||
} else { | ||
cancelPollingClusterFormationInfo(); | ||
} | ||
} | ||
|
||
private void beginPollingClusterFormationInfo() { | ||
cancelPollingClusterFormationInfo(); | ||
clusterFormationInfoTasks = getMasterEligibleNodes().stream() | ||
.map(masterNode -> beginPollingClusterFormationInfo(masterNode, clusterFormationResponses)) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private void cancelPollingClusterFormationInfo() { | ||
clusterFormationInfoTasks.forEach(Scheduler.Cancellable::cancel); | ||
/* | ||
* Recreates the map so that we don't read old information, or worse get stuck with information about a node that has been | ||
* removed from the cluster. | ||
*/ | ||
clusterFormationResponses = new ConcurrentHashMap<>(); | ||
} | ||
|
||
Scheduler.Cancellable beginPollingClusterFormationInfo( | ||
// Non-private for testing | ||
DiscoveryNode node, | ||
final ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> nodeToClusterFormationStateMap | ||
) { | ||
return new PollClusterFormationStateTask(node, nodeToClusterFormationStateMap).pollUntilCancelled(); | ||
} | ||
|
||
/* | ||
* This inner class wraps the logic of polling a master-eligible node for its cluster formation information (which is needed in the | ||
* event that the cluster cannot elect a master node). | ||
*/ | ||
// Non-private for testing | ||
class PollClusterFormationStateTask { | ||
/** | ||
* The node that is being polled | ||
*/ | ||
private final DiscoveryNode node; | ||
/** | ||
* This is a reference to the global nodeToClusterFormationStateMap that was current at the time this object was constructed. The | ||
* global map is recreated whenever the task is cancelled. Having this reference prevents accidental writes to that map after | ||
* cancellation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit confusing as it implies we're copying it (ie "that was current at the time this object was constructed"), but also "references" it ... which means it's going to change in line with the global map. I'm also not sure about the accidental writes to "that" map (it's "this" map isn't it :) ?) nor what's accidental about the said writes. Could we maybe talk about the why we have this map here as opposed to the implementation details of how it arrived here? (apologies if I'm misunderstanding its purpose) Update: maybe we should drop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Below is a scenario showing why I'm copying the We begin polling master eligible nodes A, B, C So now we have result for A, B, C, and D, even though C is no longer part of the cluster. So we get weird results when we look to see if we can form a quorum, or if C knows about the existence of D. Or even if it's not as extreme as a node being replaced, we have results that might be out of date, causing confusion. My simple way to avoid worrying about all of this was to just pass the method a reference to the clusterFormationResponses map that it is supposed to be using. |
||
*/ | ||
private final ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> nodeToClusterFormationStateMap; | ||
/** | ||
* This is a wrapper Cancellable. After polling begins, every time a new remote request is scheduled (about once every 10 | ||
* seconds) we get a new Cancellable. This wraps all of them so that we only have to cancel the single Cancellable that is | ||
* initially returned from pollUntilCancelled() in order to cancel them all. | ||
*/ | ||
private final MultipleCancellablesWrapper multipleCancellablesWrapper; | ||
|
||
/** | ||
* This constructor is used to create the root task. It initializes the MultipleCancellablesWrapper that is shared between all | ||
* the related tasks. | ||
* | ||
* @param node The node to poll for cluster formation information | ||
* @param nodeToClusterFormationStateMap A reference to the global nodeToClusterFormationStateMap | ||
*/ | ||
PollClusterFormationStateTask( | ||
DiscoveryNode node, | ||
final ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> nodeToClusterFormationStateMap | ||
) { | ||
this(node, nodeToClusterFormationStateMap, new MultipleCancellablesWrapper()); | ||
} | ||
|
||
private PollClusterFormationStateTask( | ||
DiscoveryNode node, | ||
final ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> nodeToClusterFormationStateMap, | ||
MultipleCancellablesWrapper multipleCancellablesWrapper | ||
) { | ||
this.node = node; | ||
this.nodeToClusterFormationStateMap = nodeToClusterFormationStateMap; | ||
this.multipleCancellablesWrapper = multipleCancellablesWrapper; | ||
} | ||
|
||
/** | ||
* This method returns a Cancellable quickly, but in the background schedules to query the remote node's cluster formation state | ||
* in 10 seconds, and repeats doing that until cancel() is called on the returned Cancellable. | ||
* | ||
* @return | ||
*/ | ||
public Scheduler.Cancellable pollUntilCancelled() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to me that Whilst here - is there a need for Maybe the the signature I'm proposing is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh good point about PollClusterFormationStateTask's existence. In an earlier incarnation this was an actual Runnable, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's the signature I have now: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK the method now accepts a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a step in the right direction, thanks for implementing it !
Would this be an avenue for simplification? Say the method signature is as follows
This means the caller is responsible for re-calling I think this would remove the need for the
Consuming the
This would reduce the scope of What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless I'm misunderstanding something, we can't use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I think your proposal exposes us to the race condition here right (although I'm not 100% sure that causes big problems)? #88397 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah apologies I did mean
I think it's a bit difficult to follow, that's why I suggested reducing the scope of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I was mistaken about where it was failing. As we discussed offline, the actual reason it was failing was because the code at #88397 (comment) would stop polling after 2 attempts. We're now doing something very similar to that, but recursive so that it continues polling until cancelled. |
||
StepListener<Releasable> connectionListener = new StepListener<>(); | ||
StepListener<ClusterFormationInfoAction.Response> clusterFormationInfoResponseListener = new StepListener<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this be more readable if named |
||
long startTime = System.nanoTime(); | ||
connectionListener.whenComplete(releasable -> { | ||
logger.trace("Opened connection to {}, making cluster coordination info request", node); | ||
// If we don't get a response in 10 seconds that is a failure worth capturing on its own: | ||
final TimeValue transportTimeout = TimeValue.timeValueSeconds(10); | ||
transportService.sendRequest( | ||
node, | ||
ClusterFormationInfoAction.NAME, | ||
new ClusterFormationInfoAction.Request(), | ||
TransportRequestOptions.timeout(transportTimeout), | ||
new ActionListenerResponseHandler<>( | ||
ActionListener.runAfter( | ||
ActionListener.runBefore(clusterFormationInfoResponseListener, () -> Releasables.close(releasable)), | ||
() -> new PollClusterFormationStateTask(node, nodeToClusterFormationStateMap, multipleCancellablesWrapper) | ||
.pollUntilCancelled() | ||
), | ||
ClusterFormationInfoAction.Response::new | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be simplified? Even if just documenting the order of execution? ie
Hmm, actually is the Could the last step ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It could, but it would have to be done twice (once in the success path and once in the fail path). The nice thing about |
||
) | ||
); | ||
}, e -> { | ||
logger.warn("Exception connecting to master node", e); | ||
nodeToClusterFormationStateMap.put(node, new ClusterFormationStateOrException(e)); | ||
/* | ||
* Note: We can't call pollUntilCancelled() in a runAfter() in this case because when the corresponding | ||
* onResponse() is called we actually aren't finished yet (because it makes another asynchronous request). | ||
*/ | ||
new PollClusterFormationStateTask(node, nodeToClusterFormationStateMap, multipleCancellablesWrapper).pollUntilCancelled(); | ||
}); | ||
|
||
clusterFormationInfoResponseListener.whenComplete(response -> { | ||
long endTime = System.nanoTime(); | ||
logger.trace("Received cluster coordination info from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); | ||
nodeToClusterFormationStateMap.put(node, new ClusterFormationStateOrException(response.getClusterFormationState())); | ||
}, e -> { | ||
logger.warn("Exception in cluster coordination info request to master node", e); | ||
nodeToClusterFormationStateMap.put(node, new ClusterFormationStateOrException(e)); | ||
}); | ||
|
||
Scheduler.ScheduledCancellable scheduledCancellable = transportService.getThreadPool().schedule(() -> { | ||
Version minSupportedVersion = Version.V_8_4_0; | ||
if (node.getVersion().onOrAfter(minSupportedVersion) == false) { // This was introduced in 8.4.0 | ||
logger.trace( | ||
"Cannot get cluster coordination info for {} because it is at version {} and {} is required", | ||
node, | ||
node.getVersion(), | ||
minSupportedVersion | ||
); | ||
} else { | ||
transportService.connectToNode( | ||
// Note: This connection must be explicitly closed in the connectionListener | ||
node, | ||
ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()), | ||
connectionListener | ||
); | ||
} | ||
}, new TimeValue(10, TimeUnit.SECONDS), ThreadPool.Names.SAME); | ||
multipleCancellablesWrapper.addNewCancellable(scheduledCancellable); | ||
return multipleCancellablesWrapper; | ||
} | ||
|
||
/** | ||
* This class represents a collection of related Cancellables. If one is cancelled, they are all considered cancelled. If cancel() | ||
* is called on this method, then cancel() is called on all child Cancellables. | ||
*/ | ||
static class MultipleCancellablesWrapper implements Scheduler.Cancellable { | ||
/* | ||
* This field will be read from and written to on multiple threads. CopyOnWriteArrayList is used here to avoid explicitly | ||
* synchronizing access and to avoid ConcurrentModificationExceptions when iterating through the delegates. | ||
*/ | ||
private final List<Scheduler.Cancellable> delegates = new CopyOnWriteArrayList<>(); | ||
|
||
@Override | ||
public boolean cancel() { | ||
delegates.forEach(Scheduler.Cancellable::cancel); | ||
return true; | ||
} | ||
|
||
@Override | ||
public boolean isCancelled() { | ||
return delegates.stream().anyMatch(Scheduler.Cancellable::isCancelled); | ||
} | ||
|
||
public void addNewCancellable(Scheduler.Cancellable cancellable) { | ||
delegates.add(cancellable); | ||
} | ||
} | ||
} | ||
|
||
// Non-private for testing | ||
record ClusterFormationStateOrException( | ||
ClusterFormationFailureHelper.ClusterFormationState clusterFormationState, | ||
Exception exception | ||
) { | ||
ClusterFormationStateOrException { | ||
if (clusterFormationState != null && exception != null) { | ||
throw new IllegalArgumentException("Cluster formation state and exception cannot both be non-null"); | ||
} | ||
} | ||
|
||
ClusterFormationStateOrException(ClusterFormationFailureHelper.ClusterFormationState clusterFormationState) { | ||
this(clusterFormationState, null); | ||
} | ||
|
||
ClusterFormationStateOrException(Exception exception) { | ||
this(null, exception); | ||
} | ||
} | ||
|
||
public record CoordinationDiagnosticsResult( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we shouldn't be constantly calling this
cancel...
when the cluster is healthy, but only when re-gaining a master.@DaveCTurner what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a huge deal. Maybe we could use
null
instead ofnew ConcurrentHashMap<>()
to distinguish "we're not polling" from "we're polling but have no entries"?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that it's basically a no-op (there won't be any scheduled tasks most of the time), and the miniscule performance hit was worth the risk of complicating the code and/or accidentally not calling it when it needed to be called.
I can have
cancelPollingClusterFormationInfo
setclusterFormationResponses
to null and havebeginPollingClusterFormationInfo
create anew ConcurrentHashMap<>()
(I think that's what you're suggesting?) -- that would save a little garbage collection.