Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check valid cluster service state transitions #21538

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
Expand Down Expand Up @@ -123,7 +124,7 @@ public class ClusterService extends AbstractLifecycleComponent {

private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();

private final AtomicReference<ClusterServiceState> state = new AtomicReference<>();
private final AtomicReference<ClusterServiceState> state;

private final ClusterBlocks.Builder initialBlocks;

Expand All @@ -137,7 +138,7 @@ public ClusterService(Settings settings,
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
// will be replaced on doStart.
this.state.set(new ClusterServiceState(ClusterState.builder(clusterName).build(), ClusterStateStatus.UNKNOWN));
this.state = new AtomicReference<>(new ClusterServiceState(ClusterState.builder(clusterName).build(), ClusterStateStatus.UNKNOWN));

this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
Expand All @@ -158,14 +159,45 @@ public synchronized void setClusterStatePublisher(BiConsumer<ClusterChangedEvent
}

public synchronized void setLocalNode(DiscoveryNode localNode) {
assert state.get().getClusterState().nodes().getLocalNodeId() == null : "local node is already set";
this.state.getAndUpdate(css -> {
assert clusterServiceState().getClusterState().nodes().getLocalNodeId() == null : "local node is already set";
updateState(css -> {
ClusterState clusterState = css.getClusterState();
DiscoveryNodes nodes = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId()).build();
return new ClusterServiceState(ClusterState.builder(clusterState).nodes(nodes).build(), css.getClusterStateStatus());
});
}

private void updateState(UnaryOperator<ClusterServiceState> updateFunction) {
this.state.getAndUpdate(oldClusterServiceState -> {
ClusterServiceState newClusterServiceState = updateFunction.apply(oldClusterServiceState);
assert validStateTransition(oldClusterServiceState, newClusterServiceState) :
"Invalid cluster service state transition from " + oldClusterServiceState + " to " + newClusterServiceState;
return newClusterServiceState;
});
}

private static boolean validStateTransition(ClusterServiceState oldClusterServiceState, ClusterServiceState newClusterServiceState) {
if (oldClusterServiceState == null || newClusterServiceState == null) {
return false;
}
ClusterStateStatus oldStatus = oldClusterServiceState.getClusterStateStatus();
ClusterStateStatus newStatus = newClusterServiceState.getClusterStateStatus();
// only go from UNKNOWN to UNKNOWN or BEING_APPLIED
if (oldStatus == ClusterStateStatus.UNKNOWN && newStatus == ClusterStateStatus.APPLIED) {
return false;
}
// only go from BEING_APPLIED to APPLIED
if (oldStatus == ClusterStateStatus.BEING_APPLIED && newStatus != ClusterStateStatus.APPLIED) {
return false;
}
// only go from APPLIED to BEING_APPLIED
if (oldStatus == ClusterStateStatus.APPLIED && newStatus != ClusterStateStatus.BEING_APPLIED) {
return false;
}
boolean identicalClusterState = oldClusterServiceState.getClusterState() == newClusterServiceState.getClusterState();
return identicalClusterState == (oldStatus == ClusterStateStatus.BEING_APPLIED && newStatus == ClusterStateStatus.APPLIED);
}

public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
assert this.nodeConnectionsService == null : "nodeConnectionsService is already set";
this.nodeConnectionsService = nodeConnectionsService;
Expand Down Expand Up @@ -201,10 +233,10 @@ public synchronized void removeInitialStateBlock(int blockId) throws IllegalStat
@Override
protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(state.get().getClusterState().nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(clusterServiceState().getClusterState().nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
add(localNodeMasterListeners);
this.state.getAndUpdate(css -> new ClusterServiceState(
updateState(css -> new ClusterServiceState(
ClusterState.builder(css.getClusterState()).blocks(initialBlocks).build(),
css.getClusterStateStatus()));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
Expand Down Expand Up @@ -240,7 +272,7 @@ protected synchronized void doClose() {
* The local node.
*/
public DiscoveryNode localNode() {
DiscoveryNode localNode = state.get().getClusterState().getNodes().getLocalNode();
DiscoveryNode localNode = state().getNodes().getLocalNode();
if (localNode == null) {
throw new IllegalStateException("No local node found. Is the node started?");
}
Expand All @@ -255,7 +287,7 @@ public OperationRouting operationRouting() {
* The current cluster state.
*/
public ClusterState state() {
return this.state.get().getClusterState();
return clusterServiceState().getClusterState();
}

/**
Expand Down Expand Up @@ -554,7 +586,7 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
return;
}
logger.debug("processing [{}]: execute", tasksSummary);
ClusterState previousClusterState = state.get().getClusterState();
ClusterState previousClusterState = clusterServiceState().getClusterState();
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && executor.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", tasksSummary);
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
Expand Down Expand Up @@ -704,7 +736,8 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
}

// update the current cluster state
state.set(new ClusterServiceState(newClusterState, ClusterStateStatus.BEING_APPLIED));
ClusterState finalNewClusterState = newClusterState;
updateState(css -> new ClusterServiceState(finalNewClusterState, ClusterStateStatus.BEING_APPLIED));
logger.debug("set local cluster state to version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
Expand All @@ -725,7 +758,7 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {

nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());

state.getAndUpdate(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));
updateState(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));

for (ClusterStateListener listener : postAppliedListeners) {
try {
Expand Down