Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polling cluster formation state for master-is-stable health indicator #88397

Merged
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
85eaacf
Polling cluster formation state
masseyke Jul 8, 2022
85323a4
cancelling all cancellables
masseyke Jul 8, 2022
dc3b581
fixing compilation errors
masseyke Jul 8, 2022
5d215f3
adding unit test
masseyke Jul 8, 2022
9cff8c7
spotlessApply
masseyke Jul 8, 2022
6eabf6d
simplifying
masseyke Jul 8, 2022
3e38a50
more cleanup
masseyke Jul 8, 2022
7ee8a03
more cleanup
masseyke Jul 8, 2022
6605e87
fixing ConcurrentModificationError
masseyke Jul 8, 2022
c5e7260
braking apart huge nested callbacks
masseyke Jul 11, 2022
27ebfd7
using StepListener
masseyke Jul 11, 2022
e14af5b
Unit testing
masseyke Jul 11, 2022
919b5f7
checkstyle
masseyke Jul 11, 2022
630a092
cleaning up
masseyke Jul 11, 2022
f407efc
cleaning up
masseyke Jul 11, 2022
be04f87
Update docs/changelog/88397.yaml
masseyke Jul 11, 2022
f760226
code review feedback
masseyke Jul 12, 2022
4c127d9
removing MultipleCancellablesWrapper(
masseyke Jul 12, 2022
0168cfb
improving comments
masseyke Jul 12, 2022
a282a0d
code review feedback
masseyke Jul 12, 2022
b7937a9
Merge branch 'master' into feature/polling-cluster-formation-state
elasticmachine Jul 12, 2022
9ef64e5
cleaning up
masseyke Jul 12, 2022
69099bb
cleaning up
masseyke Jul 12, 2022
963279f
committing working code
masseyke Jul 13, 2022
c86f450
committing working code
masseyke Jul 13, 2022
fb77e27
code review feedback
masseyke Jul 13, 2022
8cea8a5
cleanup
masseyke Jul 13, 2022
0144e60
code review feedback
masseyke Jul 14, 2022
e7e8949
Merge branch 'master' into feature/polling-cluster-formation-state
elasticmachine Jul 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88397.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88397
summary: Polling cluster formation state for master-is-stable health indicator
area: Health
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,29 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.PrintWriter;
Expand All @@ -30,7 +43,12 @@
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand All @@ -47,6 +65,7 @@
*/
public class CoordinationDiagnosticsService implements ClusterStateListener {
private final ClusterService clusterService;
private final TransportService transportService;
private final Coordinator coordinator;
private final MasterHistoryService masterHistoryService;
/**
Expand All @@ -63,6 +82,19 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
*/
private final int unacceptableIdentityChanges;

/*
* This is a list of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for
* diagnosis.
* The field is accessed (reads/writes) from multiple threads, but the reference itself is only ever changed on the cluster change
* event thread.
*/
private volatile List<Scheduler.Cancellable> clusterFormationInfoTasks = null;
/*
* This field holds the results of the tasks in the clusterFormationInfoTasks field above. The field is accessed (reads/writes) from
* multiple threads, but the reference itself is only ever changed on the cluster change event thread.
*/
private volatile ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> clusterFormationResponses = null;

private static final Logger logger = LogManager.getLogger(CoordinationDiagnosticsService.class);

/**
Expand Down Expand Up @@ -98,10 +130,12 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {

public CoordinationDiagnosticsService(
ClusterService clusterService,
TransportService transportService,
Coordinator coordinator,
MasterHistoryService masterHistoryService
) {
this.clusterService = clusterService;
this.transportService = transportService;
this.coordinator = coordinator;
this.masterHistoryService = masterHistoryService;
this.nodeHasMasterLookupTimeframe = NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -410,6 +444,168 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
if (currentMaster == null && clusterService.localNode().isMasterNode()) {
/*
* This begins polling all master-eligible nodes for cluster formation information. However there's a 10-second delay before it
* starts, so in the normal situation where during a master transition it flips from master1 -> null -> master2, it the
* polling tasks will be canceled before any requests are actually made.
*/
beginPollingClusterFormationInfo();
} else {
cancelPollingClusterFormationInfo();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we shouldn't be constantly calling this cancel... when the cluster is healthy, but only when re-gaining a master.
@DaveCTurner what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a huge deal. Maybe we could use null instead of new ConcurrentHashMap<>() to distinguish "we're not polling" from "we're polling but have no entries"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that it's basically a no-op (there won't be any scheduled tasks most of the time), and the miniscule performance hit was worth the risk of complicating the code and/or accidentally not calling it when it needed to be called.
I can have cancelPollingClusterFormationInfo set clusterFormationResponses to null and have beginPollingClusterFormationInfo create a new ConcurrentHashMap<>() (I think that's what you're suggesting?) -- that would save a little garbage collection.

}
}

/**
* This method begins polling all known master-eligible nodes for cluster formation information. After a 10-second initial delay, it
* polls each node every 10 seconds until cancelPollingClusterFormationInfo() is called.
*/
private void beginPollingClusterFormationInfo() {
assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
cancelPollingClusterFormationInfo();
ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> responses = new ConcurrentHashMap<>();
List<Scheduler.Cancellable> cancellables = new CopyOnWriteArrayList<>();
beginPollingClusterFormationInfo(getMasterEligibleNodes(), responses::put, cancellables::add);
clusterFormationResponses = responses;
clusterFormationInfoTasks = cancellables;
}

/**
* This method returns quickly, but in the background schedules to query the remote node's cluster formation state in 10 seconds, and
* repeats doing that until cancel() is called on all of the Cancellable that this method inserts into cancellables. This method
* exists (rather than being just part of the beginPollingClusterFormationInfo() above) in order to facilitate unit testing.
* @param nodeResponseConsumer A consumer for any results produced for a node by this method
* @param cancellableConsumer A consumer for any Cancellable tasks produced by this method
*/
// Non-private for testing
void beginPollingClusterFormationInfo(
Collection<DiscoveryNode> masterEligibleNodes,
BiConsumer<DiscoveryNode, ClusterFormationStateOrException> nodeResponseConsumer,
Consumer<Scheduler.Cancellable> cancellableConsumer
) {
masterEligibleNodes.forEach(masterEligibleNode -> {
Consumer<ClusterFormationStateOrException> responseConsumer = result -> nodeResponseConsumer.accept(masterEligibleNode, result);
cancellableConsumer.accept(
fetchClusterFormationInfo(
masterEligibleNode,
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
)
);
});
}

/**
* This wraps the responseConsumer in a Consumer that will run beginPollingClusterFormationInfo() after responseConsumer has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will run fetchClusterFormationInfo I believe?

* completed, adding the resulting Cancellable to cancellableConsumer.
* @param masterEligibleNode The node being polled
* @param responseConsumer The response consumer to be wrapped
* @param cancellableConsumer The list of Cancellables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not accurate anymore

* @return
*/
private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleFetchConsumer(
DiscoveryNode masterEligibleNode,
Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> responseConsumer,
Consumer<Scheduler.Cancellable> cancellableConsumer
) {
return response -> {
cancellableConsumer.accept(
fetchClusterFormationInfo(
masterEligibleNode,
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
)
);
};
}

private void cancelPollingClusterFormationInfo() {
assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
if (clusterFormationResponses != null) {
clusterFormationInfoTasks.forEach(Scheduler.Cancellable::cancel);
clusterFormationResponses = null;
clusterFormationInfoTasks = null;
}
}

/**
* This method returns quickly, but in the background schedules to query the remote node's cluster formation state in 10 seconds
* unless cancel() is called on the Cancellable that this method returns.
* @param node The node to poll for cluster formation information
* @param responseConsumer The consumer of the cluster formation info for the node, or the exception encountered while contacting it
* @return A Cancellable for the task that is scheduled to fetch cluster formation information
*/
private Scheduler.Cancellable fetchClusterFormationInfo(
DiscoveryNode node,
Consumer<ClusterFormationStateOrException> responseConsumer
) {
StepListener<Releasable> connectionListener = new StepListener<>();
StepListener<ClusterFormationInfoAction.Response> fetchClusterInfoListener = new StepListener<>();
long startTime = System.nanoTime();
connectionListener.whenComplete(releasable -> {
logger.trace("Opened connection to {}, making cluster coordination info request", node);
// If we don't get a response in 10 seconds that is a failure worth capturing on its own:
final TimeValue transportTimeout = TimeValue.timeValueSeconds(10);
transportService.sendRequest(
node,
ClusterFormationInfoAction.NAME,
new ClusterFormationInfoAction.Request(),
TransportRequestOptions.timeout(transportTimeout),
new ActionListenerResponseHandler<>(
ActionListener.runBefore(fetchClusterInfoListener, () -> Releasables.close(releasable)),
ClusterFormationInfoAction.Response::new
)
);
}, e -> {
logger.warn("Exception connecting to master node", e);
responseConsumer.accept(new ClusterFormationStateOrException(e));
});

fetchClusterInfoListener.whenComplete(response -> {
long endTime = System.nanoTime();
logger.trace("Received cluster coordination info from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
responseConsumer.accept(new ClusterFormationStateOrException(response.getClusterFormationState()));
}, e -> {
logger.warn("Exception in cluster coordination info request to master node", e);
responseConsumer.accept(new ClusterFormationStateOrException(e));
});

return transportService.getThreadPool().schedule(() -> {
Version minSupportedVersion = Version.V_8_4_0;
if (node.getVersion().onOrAfter(minSupportedVersion) == false) { // This was introduced in 8.4.0
logger.trace(
"Cannot get cluster coordination info for {} because it is at version {} and {} is required",
node,
node.getVersion(),
minSupportedVersion
);
} else {
transportService.connectToNode(
// Note: This connection must be explicitly closed in the connectionListener
node,
ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()),
connectionListener
);
}
}, new TimeValue(10, TimeUnit.SECONDS), ThreadPool.Names.SAME);
}

// Non-private for testing
record ClusterFormationStateOrException(
ClusterFormationFailureHelper.ClusterFormationState clusterFormationState,
Exception exception
) {
ClusterFormationStateOrException {
if (clusterFormationState != null && exception != null) {
throw new IllegalArgumentException("Cluster formation state and exception cannot both be non-null");
}
}

ClusterFormationStateOrException(ClusterFormationFailureHelper.ClusterFormationState clusterFormationState) {
this(clusterFormationState, null);
}

ClusterFormationStateOrException(Exception exception) {
this(null, exception);
}
}

public record CoordinationDiagnosticsResult(
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ protected Node(
MasterHistoryService masterHistoryService = new MasterHistoryService(transportService, threadPool, clusterService);
CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
clusterService,
transportService,
discoveryModule.getCoordinator(),
masterHistoryService
);
Expand Down
Loading