-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polling for cluster diagnostics information #88562
Changes from 8 commits
cdf137a
439cb0c
3d7fb1d
a5b0e4c
12175ad
8ac0ad6
9b1037c
f598cec
da5394a
3ee5874
72c211e
bc38e8f
da6f0bd
7dbaa91
51863e8
e9be50f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 88562 | ||
summary: Polling for cluster diagnostics information | ||
area: Health | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
import org.elasticsearch.action.ActionListenerResponseHandler; | ||
import org.elasticsearch.action.StepListener; | ||
import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction; | ||
import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterStateListener; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
|
@@ -37,6 +38,7 @@ | |
import java.io.IOException; | ||
import java.io.PrintWriter; | ||
import java.io.StringWriter; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -47,6 +49,7 @@ | |
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
|
@@ -63,7 +66,7 @@ | |
* Since this service needs to be able to run when there is no master at all, it does not depend on the dedicated health node (which | ||
* requires the existence of a master). | ||
*/ | ||
public class CoordinationDiagnosticsService implements ClusterStateListener { | ||
public class CoordinationDiagnosticsService implements ClusterStateListener, Coordinator.PeerFinderListener { | ||
private final ClusterService clusterService; | ||
private final TransportService transportService; | ||
private final Coordinator coordinator; | ||
|
@@ -97,6 +100,24 @@ public class CoordinationDiagnosticsService implements ClusterStateListener { | |
|
||
private static final Logger logger = LogManager.getLogger(CoordinationDiagnosticsService.class); | ||
|
||
/* | ||
* This is a list of tasks that are periodically reaching out to a master eligible node to get its CoordinationDiagnosticsResult for | ||
* diagnosis. | ||
* The field is accessed (reads/writes) from multiple threads, and is also reassigned on multiple threads. | ||
*/ | ||
private volatile List<Scheduler.Cancellable> remoteStableMasterHealthIndicatorTasks = null; | ||
/* | ||
* This field holds the result of the tasks in the remoteStableMasterHealthIndicatorTasks field above. The field is accessed | ||
* (reads/writes) from multiple threads, and is also reassigned on multiple threads. | ||
*/ | ||
private volatile AtomicReference<RemoteMasterHealthResult> remoteCoordinationDiagnosisResult = new AtomicReference<>(); | ||
|
||
/* | ||
* The previous two variables (remoteStableMasterHealthIndicatorTasks and remoteCoordinationDiagnosisResult) are reassigned on | ||
* multiple threads. This mutex is used to protect those reassignments. | ||
*/ | ||
private final Object remoteDiagnosticsMutex = new Object(); | ||
|
||
/** | ||
* This is the default amount of time we look back to see if we have had a master at all, before moving on with other checks | ||
*/ | ||
|
@@ -488,21 +509,23 @@ void beginPollingClusterFormationInfo( | |
cancellableConsumer.accept( | ||
fetchClusterFormationInfo( | ||
masterEligibleNode, | ||
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer)) | ||
responseConsumer.andThen( | ||
rescheduleClusterFormationFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer) | ||
) | ||
) | ||
); | ||
}); | ||
} | ||
|
||
/** | ||
* This wraps the responseConsumer in a Consumer that will run rescheduleFetchConsumer() after responseConsumer has | ||
* This wraps the responseConsumer in a Consumer that will run fetchClusterFormationInfo() after responseConsumer has | ||
* completed, adding the resulting Cancellable to cancellableConsumer. | ||
* @param masterEligibleNode The node being polled | ||
* @param responseConsumer The response consumer to be wrapped | ||
* @param cancellableConsumer The list of Cancellables | ||
* @return | ||
*/ | ||
private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleFetchConsumer( | ||
private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleClusterFormationFetchConsumer( | ||
DiscoveryNode masterEligibleNode, | ||
Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> responseConsumer, | ||
Consumer<Scheduler.Cancellable> cancellableConsumer | ||
|
@@ -511,7 +534,9 @@ private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException | |
cancellableConsumer.accept( | ||
fetchClusterFormationInfo( | ||
masterEligibleNode, | ||
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer)) | ||
responseConsumer.andThen( | ||
rescheduleClusterFormationFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer) | ||
) | ||
) | ||
); | ||
}; | ||
|
@@ -588,6 +613,159 @@ private Scheduler.Cancellable fetchClusterFormationInfo( | |
}, new TimeValue(10, TimeUnit.SECONDS), ThreadPool.Names.SAME); | ||
} | ||
|
||
private void beginPollingRemoteStableMasterHealthIndicatorService(Collection<DiscoveryNode> masterEligibleNodes) { | ||
synchronized (remoteDiagnosticsMutex) { | ||
if (remoteStableMasterHealthIndicatorTasks == null) { | ||
List<Scheduler.Cancellable> cancellables = new CopyOnWriteArrayList<>(); | ||
AtomicReference<RemoteMasterHealthResult> resultReference = new AtomicReference<>(); | ||
beginPollingRemoteStableMasterHealthIndicatorService(masterEligibleNodes, resultReference::set, cancellables::add); | ||
remoteStableMasterHealthIndicatorTasks = cancellables; | ||
remoteCoordinationDiagnosisResult = resultReference; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This method returns quickly, but in the background schedules to query the remote node's cluster diagnostics in 10 seconds, and | ||
* repeats doing that until cancel() is called on all of the Cancellable that this method sends to the cancellableConsumer. This method | ||
* exists (rather than being just part of the beginPollingRemoteStableMasterHealthIndicatorService() above) in order to facilitate | ||
* unit testing. | ||
* masterEligibleNodes A collection of all master eligible nodes that may be polled | ||
* @param responseConsumer A consumer for any results produced for a node by this method | ||
* @param cancellableConsumer A consumer for any Cancellable tasks produced by this method | ||
*/ | ||
// Non-private for testing | ||
void beginPollingRemoteStableMasterHealthIndicatorService( | ||
Collection<DiscoveryNode> masterEligibleNodes, | ||
Consumer<RemoteMasterHealthResult> responseConsumer, | ||
Consumer<Scheduler.Cancellable> cancellableConsumer | ||
) { | ||
masterEligibleNodes.stream().findAny().ifPresentOrElse(masterEligibleNode -> { | ||
cancellableConsumer.accept( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this list will grow indefinitely while the master is missing. Should we simplify here? Is there ever a time where the list will contain more than one validly cancellable object? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a really good point, and the same thing impacts #88397 (it was an unintended side effect of a late change on that one). I'm pretty sure I could just keep one, and whenever I'm about to set the value I cancel the existing one. I think that would be better than potentially leaving a cancellable task running, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It only reschedules the execution of the remote diagnostic after it receives a response from the remote server right? At that point the item isn't really cancellable any more, it should be done. I think you're safe to just replace it in the regular rescheduling case, and leave any cancellation logic to code outside the regular reschedule loop? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also it looks like the only thing that is actually cancellable is the thread pool task that is waiting 10 seconds. If a master node comes back and it attempts to cancel the task but it has already started the remote diagnostic, does the cancellation get ignored and the rescheduling continue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh you're right -- we don't schedule this every 10 seconds. We schedule it every 10 seconds after the previous one completes (or errs out). So we just need to keep track of a single cancellable, and we only need to cancel it in the
I believe I looked into this a few weeks ago, and I think I convinced myself that calling cancel issues an interrupt that interrupts network requests, but I could be wrong. Either way, I don't think there's much else we can do about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We might want to move the rescheduling logic into the runnable that is cancellable then - If we wait until after the results are in to reschedule the remove diagnostic then we might miss the cancellation when a new master shows up that says to stop collecting remote diagnostics. Even if we use an atomic reference to store the active cancellable it's possible that we can still end up in a runaway execution I think. Example:
A couple ways we might be able to guard against this: We could mutex the cancellation and scheduling logic so we can't schedule anything new while something is being cancelled. Once cancelled, the scheduling logic will need to check something to see that it is indeed cancelled to avoid it scheduling anyway once it obtains the lock. It might be even safer to modify the rescheduling logic to check if it should not reschedule any more. Either the current node has a master or the diagnostic response indicates there is a master, or something else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I think I've taken care of this risk now. I'm putting an AtomicBoolean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure it's still a problem (and actually a bigger one since there we're polling all nodes instead of one). But I'll put that into a separate PR. |
||
fetchCoordinationDiagnostics( | ||
masterEligibleNode, | ||
responseConsumer.andThen(rescheduleDiagnosticsFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer)) | ||
) | ||
); | ||
}, () -> logger.trace("No master eligible node found")); | ||
} | ||
|
||
/** | ||
* This wraps the responseConsumer in a Consumer that will run fetchCoordinationDiagnostics() after responseConsumer has | ||
* completed, adding the resulting Cancellable to cancellableConsumer. | ||
* @param masterEligibleNode The node being polled | ||
* @param responseConsumer The response consumer to be wrapped | ||
* @param cancellableConsumer The list of Cancellables | ||
* @return A wrapped Consumer that will run fetchCoordinationDiagnostics() | ||
*/ | ||
private Consumer<RemoteMasterHealthResult> rescheduleDiagnosticsFetchConsumer( | ||
DiscoveryNode masterEligibleNode, | ||
Consumer<RemoteMasterHealthResult> responseConsumer, | ||
Consumer<Scheduler.Cancellable> cancellableConsumer | ||
) { | ||
return response -> { | ||
cancellableConsumer.accept( | ||
fetchCoordinationDiagnostics( | ||
masterEligibleNode, | ||
responseConsumer.andThen(rescheduleDiagnosticsFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer)) | ||
) | ||
); | ||
}; | ||
} | ||
|
||
/** | ||
* This method returns quickly, but in the background schedules to query the remote node's cluster diagnostics in 10 seconds | ||
* unless cancel() is called on the Cancellable that this method returns. | ||
* @param node The node to poll for cluster diagnostics | ||
* @param responseConsumer The consumer of the cluster diagnostics for the node, or the exception encountered while contacting it | ||
* @return A Cancellable for the task that is scheduled to fetch cluster diagnostics | ||
*/ | ||
private Scheduler.Cancellable fetchCoordinationDiagnostics(DiscoveryNode node, Consumer<RemoteMasterHealthResult> responseConsumer) { | ||
StepListener<Releasable> connectionListener = new StepListener<>(); | ||
StepListener<CoordinationDiagnosticsAction.Response> fetchCoordinationDiagnosticsListener = new StepListener<>(); | ||
long startTime = System.nanoTime(); | ||
connectionListener.whenComplete(releasable -> { | ||
logger.trace("Opened connection to {}, making master stability request", node); | ||
// If we don't get a response in 10 seconds that is a failure worth capturing on its own: | ||
final TimeValue transportTimeout = TimeValue.timeValueSeconds(10); | ||
transportService.sendRequest( | ||
node, | ||
CoordinationDiagnosticsAction.NAME, | ||
new CoordinationDiagnosticsAction.Request(true), | ||
TransportRequestOptions.timeout(transportTimeout), | ||
new ActionListenerResponseHandler<>( | ||
ActionListener.runBefore(fetchCoordinationDiagnosticsListener, () -> Releasables.close(releasable)), | ||
CoordinationDiagnosticsAction.Response::new | ||
) | ||
); | ||
}, e -> { | ||
logger.warn("Exception connecting to master node", e); | ||
responseConsumer.accept(new RemoteMasterHealthResult(node, null, e)); | ||
}); | ||
|
||
fetchCoordinationDiagnosticsListener.whenComplete(response -> { | ||
long endTime = System.nanoTime(); | ||
logger.trace("Received master stability result from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); | ||
responseConsumer.accept(new RemoteMasterHealthResult(node, response.getCoordinationDiagnosticsResult(), null)); | ||
}, e -> { | ||
logger.warn("Exception in master stability request to master node", e); | ||
responseConsumer.accept(new RemoteMasterHealthResult(node, null, e)); | ||
}); | ||
|
||
return transportService.getThreadPool().schedule(() -> { | ||
Version minSupportedVersion = Version.V_8_4_0; | ||
if (node.getVersion().onOrAfter(minSupportedVersion) == false) { // This was introduced in 8.4.0 | ||
logger.trace( | ||
"Cannot get cluster coordination info for {} because it is at version {} and {} is required", | ||
node, | ||
node.getVersion(), | ||
minSupportedVersion | ||
); | ||
} else { | ||
transportService.connectToNode( | ||
// Note: This connection must be explicitly closed in the connectionListener | ||
node, | ||
ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()), | ||
connectionListener | ||
); | ||
} | ||
}, new TimeValue(10, TimeUnit.SECONDS), ThreadPool.Names.SAME); | ||
} | ||
|
||
private void cancelPollingRemoteStableMasterHealthIndicatorService() { | ||
synchronized (remoteDiagnosticsMutex) { | ||
if (remoteStableMasterHealthIndicatorTasks != null) { | ||
remoteStableMasterHealthIndicatorTasks.forEach(Scheduler.Cancellable::cancel); | ||
remoteStableMasterHealthIndicatorTasks = null; | ||
remoteCoordinationDiagnosisResult = new AtomicReference<>(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onFoundPeersUpdated() { | ||
/* | ||
* If we are on a non-master-eligible node, and the list of peers in PeerFinder is non-empty, that implies that there is | ||
* currently no master node elected. | ||
* This begins polling a random master-eligible node for its result from this service. However there's a 10-second delay before it | ||
* starts, so in the normal situation where during a master transition it flips from master1 -> null -> master2, it the | ||
* polling tasks will be canceled before any requests are actually made. | ||
* Note that this method can be called from multiple threads. | ||
*/ | ||
if (clusterService.localNode().isMasterNode() == false) { | ||
/* | ||
* Note that PeerFinder (the source of master eligible nodes) could be updating the master eligible nodes on a different | ||
* thread, so making a copy here so that it doesn't change for the short duration of this method. | ||
*/ | ||
List<DiscoveryNode> masterEligibleNodes = new ArrayList<>(getMasterEligibleNodes()); | ||
if (masterEligibleNodes.isEmpty()) { | ||
cancelPollingRemoteStableMasterHealthIndicatorService(); | ||
} else { | ||
beginPollingRemoteStableMasterHealthIndicatorService(masterEligibleNodes); | ||
} | ||
} | ||
Comment on lines
+1020
to
+1026
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering. Are these the correct start/stop triggers for this polling? Even further, for the Would it be simpler to start the polling of one random master eligible node (as indicated here) from the cluster state change event notification that illustrates we lost the master node ? And stop the polling when we've got a master? The Would this simplify the design? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that non-master-eligible nodes do not have any peers in PeerFinder at the time that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't the polling retry 10 seconds later?
The polling will eventually read a populated list. We could leave the polling track to attempt to read the master eligibile nodes using |
||
} | ||
|
||
// Non-private for testing | ||
record ClusterFormationStateOrException( | ||
ClusterFormationFailureHelper.ClusterFormationState clusterFormationState, | ||
|
@@ -715,4 +893,7 @@ public void writeTo(StreamOutput out) throws IOException { | |
} | ||
|
||
} | ||
|
||
// Non-private for testing: | ||
record RemoteMasterHealthResult(DiscoveryNode node, CoordinationDiagnosticsResult result, Exception remoteException) {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this comment is accurate:
I think it will run the
rescheduleClusterFormationFetchConsumer