Skip to content

Commit

Permalink
Check valid cluster service state transitions (#21538)
Browse files Browse the repository at this point in the history
This commit adds assertions to check whether the cluster service state transitions in a way that we expect it to.

Relates to #21379.
  • Loading branch information
ywelsch authored and bleskes committed Dec 19, 2016
1 parent dba88b8 commit 3d2c5fb
Showing 1 changed file with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
Expand Down Expand Up @@ -124,7 +125,7 @@ public class ClusterService extends AbstractLifecycleComponent {

private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();

private final AtomicReference<ClusterServiceState> state = new AtomicReference<>();
private final AtomicReference<ClusterServiceState> state;

private final ClusterBlocks.Builder initialBlocks;

Expand All @@ -138,7 +139,7 @@ public ClusterService(Settings settings,
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
// will be replaced on doStart.
this.state.set(new ClusterServiceState(ClusterState.builder(clusterName).build(), ClusterStateStatus.UNKNOWN));
this.state = new AtomicReference<>(new ClusterServiceState(ClusterState.builder(clusterName).build(), ClusterStateStatus.UNKNOWN));

this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
Expand All @@ -159,14 +160,45 @@ public synchronized void setClusterStatePublisher(BiConsumer<ClusterChangedEvent
}

public synchronized void setLocalNode(DiscoveryNode localNode) {
assert state.get().getClusterState().nodes().getLocalNodeId() == null : "local node is already set";
this.state.getAndUpdate(css -> {
assert clusterServiceState().getClusterState().nodes().getLocalNodeId() == null : "local node is already set";
updateState(css -> {
ClusterState clusterState = css.getClusterState();
DiscoveryNodes nodes = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId()).build();
return new ClusterServiceState(ClusterState.builder(clusterState).nodes(nodes).build(), css.getClusterStateStatus());
});
}

private void updateState(UnaryOperator<ClusterServiceState> updateFunction) {
this.state.getAndUpdate(oldClusterServiceState -> {
ClusterServiceState newClusterServiceState = updateFunction.apply(oldClusterServiceState);
assert validStateTransition(oldClusterServiceState, newClusterServiceState) :
"Invalid cluster service state transition from " + oldClusterServiceState + " to " + newClusterServiceState;
return newClusterServiceState;
});
}

private static boolean validStateTransition(ClusterServiceState oldClusterServiceState, ClusterServiceState newClusterServiceState) {
if (oldClusterServiceState == null || newClusterServiceState == null) {
return false;
}
ClusterStateStatus oldStatus = oldClusterServiceState.getClusterStateStatus();
ClusterStateStatus newStatus = newClusterServiceState.getClusterStateStatus();
// only go from UNKNOWN to UNKNOWN or BEING_APPLIED
if (oldStatus == ClusterStateStatus.UNKNOWN && newStatus == ClusterStateStatus.APPLIED) {
return false;
}
// only go from BEING_APPLIED to APPLIED
if (oldStatus == ClusterStateStatus.BEING_APPLIED && newStatus != ClusterStateStatus.APPLIED) {
return false;
}
// only go from APPLIED to BEING_APPLIED
if (oldStatus == ClusterStateStatus.APPLIED && newStatus != ClusterStateStatus.BEING_APPLIED) {
return false;
}
boolean identicalClusterState = oldClusterServiceState.getClusterState() == newClusterServiceState.getClusterState();
return identicalClusterState == (oldStatus == ClusterStateStatus.BEING_APPLIED && newStatus == ClusterStateStatus.APPLIED);
}

public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
assert this.nodeConnectionsService == null : "nodeConnectionsService is already set";
this.nodeConnectionsService = nodeConnectionsService;
Expand Down Expand Up @@ -202,10 +234,10 @@ public synchronized void removeInitialStateBlock(int blockId) throws IllegalStat
@Override
protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(state.get().getClusterState().nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(clusterServiceState().getClusterState().nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
add(localNodeMasterListeners);
this.state.getAndUpdate(css -> new ClusterServiceState(
updateState(css -> new ClusterServiceState(
ClusterState.builder(css.getClusterState()).blocks(initialBlocks).build(),
css.getClusterStateStatus()));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
Expand Down Expand Up @@ -241,7 +273,7 @@ protected synchronized void doClose() {
* The local node.
*/
public DiscoveryNode localNode() {
DiscoveryNode localNode = state.get().getClusterState().getNodes().getLocalNode();
DiscoveryNode localNode = state().getNodes().getLocalNode();
if (localNode == null) {
throw new IllegalStateException("No local node found. Is the node started?");
}
Expand All @@ -256,7 +288,7 @@ public OperationRouting operationRouting() {
* The current cluster state.
*/
public ClusterState state() {
return this.state.get().getClusterState();
return clusterServiceState().getClusterState();
}

/**
Expand Down Expand Up @@ -580,7 +612,7 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
return;
}
logger.debug("processing [{}]: execute", tasksSummary);
ClusterState previousClusterState = state.get().getClusterState();
ClusterState previousClusterState = clusterServiceState().getClusterState();
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && executor.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", tasksSummary);
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
Expand Down Expand Up @@ -730,7 +762,8 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
}

// update the current cluster state
state.set(new ClusterServiceState(newClusterState, ClusterStateStatus.BEING_APPLIED));
ClusterState finalNewClusterState = newClusterState;
updateState(css -> new ClusterServiceState(finalNewClusterState, ClusterStateStatus.BEING_APPLIED));
logger.debug("set local cluster state to version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
Expand All @@ -751,7 +784,7 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {

nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());

state.getAndUpdate(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));
updateState(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));

for (ClusterStateListener listener : postAppliedListeners) {
try {
Expand Down

0 comments on commit 3d2c5fb

Please sign in to comment.