Skip to content

Commit

Permalink
Run ILM and SLM stopping cluster state updates at IMMEDIATE priority (#…
Browse files Browse the repository at this point in the history
…80207) (#80559)

Due to wrapping of the tasks, the update tasks could be executed at `NORMAL` priority by mistake.
This ensures they are run at the `IMMEDIATE` priority. It also adds logging when the cluster state
update has been processed so that an administrator can see what the state has been changed to.

Resolves #80099
  • Loading branch information
dakrone authored Nov 9, 2021
1 parent 3f4c696 commit 2b51efe
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -237,7 +236,10 @@ void onMaster(ClusterState clusterState) {
}

if (safeToStop && OperationMode.STOPPING == currentMode) {
submitOperationModeUpdate(OperationMode.STOPPED);
clusterService.submitStateUpdateTask(
"ilm_operation_mode_update[stopped]",
OperationModeUpdateTask.ilmMode(OperationMode.STOPPED)
);
}
}
}
Expand Down Expand Up @@ -437,7 +439,10 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
}

if (safeToStop && OperationMode.STOPPING == currentMode) {
submitOperationModeUpdate(OperationMode.STOPPED);
clusterService.submitStateUpdateTask(
"ilm_operation_mode_update[stopped]",
OperationModeUpdateTask.ilmMode(OperationMode.STOPPED)
);
}
}

Expand All @@ -453,16 +458,6 @@ assert isClusterServiceStoppedOrClosed()
}
}

public void submitOperationModeUpdate(OperationMode mode) {
OperationModeUpdateTask ilmOperationModeUpdateTask;
if (mode == OperationMode.STOPPING || mode == OperationMode.STOPPED) {
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.IMMEDIATE, mode);
} else {
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.NORMAL, mode);
}
clusterService.submitStateUpdateTask("ilm_operation_mode_update {OperationMode " + mode.name() + "}", ilmOperationModeUpdateTask);
}

/**
* Method that checks if the lifecycle state of the cluster service is stopped or closed. This
* enhances the readability of the code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;

import java.util.Objects;

/**
* This task updates the operation mode state for ILM.
*
Expand All @@ -30,22 +36,39 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
@Nullable
private final OperationMode slmMode;

public static AckedClusterStateUpdateTask wrap(
OperationModeUpdateTask task,
AckedRequest request,
ActionListener<AcknowledgedResponse> listener
) {
return new AckedClusterStateUpdateTask(task.priority(), request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return task.execute(currentState);
}
};
}

private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
super(priority);
this.ilmMode = ilmMode;
this.slmMode = slmMode;
}

public static OperationModeUpdateTask ilmMode(OperationMode mode) {
return ilmMode(Priority.NORMAL, mode);
return new OperationModeUpdateTask(getPriority(mode), mode, null);
}

public static OperationModeUpdateTask ilmMode(Priority priority, OperationMode mode) {
return new OperationModeUpdateTask(priority, mode, null);
public static OperationModeUpdateTask slmMode(OperationMode mode) {
return new OperationModeUpdateTask(getPriority(mode), null, mode);
}

public static OperationModeUpdateTask slmMode(OperationMode mode) {
return new OperationModeUpdateTask(Priority.NORMAL, null, mode);
private static Priority getPriority(OperationMode mode) {
if (mode == OperationMode.STOPPED || mode == OperationMode.STOPPING) {
return Priority.IMMEDIATE;
} else {
return Priority.NORMAL;
}
}

OperationMode getILMOperationMode() {
Expand Down Expand Up @@ -129,4 +152,33 @@ private ClusterState updateSLMState(final ClusterState currentState) {
public void onFailure(String source, Exception e) {
logger.error("unable to update lifecycle metadata with new ilm mode [" + ilmMode + "], slm mode [" + slmMode + "]", e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (ilmMode != null) {
logger.info("ILM operation mode updated to {}", ilmMode);
}
if (slmMode != null) {
logger.info("SLM operation mode updated to {}", slmMode);
}
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), ilmMode, slmMode);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
OperationModeUpdateTask other = (OperationModeUpdateTask) obj;
return Objects.equals(priority(), other.priority())
&& Objects.equals(ilmMode, other.ilmMode)
&& Objects.equals(slmMode, other.slmMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -50,12 +49,10 @@ public TransportStartILMAction(

@Override
protected void masterOperation(Task task, StartILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return (OperationModeUpdateTask.ilmMode(OperationMode.RUNNING)).execute(currentState);
}
});
clusterService.submitStateUpdateTask(
"ilm_operation_mode_update[running]",
OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(OperationMode.RUNNING), request, listener)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -52,13 +50,8 @@ public TransportStopILMAction(
@Override
protected void masterOperation(Task task, StopILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask(
"ilm_operation_mode_update",
new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);
}
}
"ilm_operation_mode_update[stopping]",
OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(OperationMode.STOPPING), request, listener)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ public void clusterChanged(final ClusterChangedEvent event) {
cancelSnapshotJobs();
}
if (slmStopping(state)) {
submitOperationModeUpdate(OperationMode.STOPPED);
clusterService.submitStateUpdateTask(
"slm_operation_mode_update[stopped]",
OperationModeUpdateTask.slmMode(OperationMode.STOPPED)
);
}
return;
}
Expand Down Expand Up @@ -130,10 +133,6 @@ static boolean slmStopping(ClusterState state) {
.orElse(false);
}

public void submitOperationModeUpdate(OperationMode mode) {
clusterService.submitStateUpdateTask("slm_operation_mode_update", OperationModeUpdateTask.slmMode(mode));
}

/**
* Schedule all non-scheduled snapshot jobs contained in the cluster state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -54,12 +53,10 @@ protected void masterOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) {
clusterService.submitStateUpdateTask("slm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return (OperationModeUpdateTask.slmMode(OperationMode.RUNNING)).execute(currentState);
}
});
clusterService.submitStateUpdateTask(
"slm_operation_mode_update[running]",
OperationModeUpdateTask.wrap(OperationModeUpdateTask.slmMode(OperationMode.RUNNING), request, listener)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -54,12 +53,10 @@ protected void masterOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) {
clusterService.submitStateUpdateTask("slm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return (OperationModeUpdateTask.slmMode(OperationMode.STOPPING)).execute(currentState);
}
});
clusterService.submitStateUpdateTask(
"slm_operation_mode_update[stopping]",
OperationModeUpdateTask.wrap(OperationModeUpdateTask.slmMode(OperationMode.STOPPING), request, listener)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -52,7 +51,6 @@
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.time.Clock;
Expand All @@ -74,11 +72,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class IndexLifecycleServiceTests extends ESTestCase {
Expand Down Expand Up @@ -287,7 +283,7 @@ private void verifyCanStopWithStep(String stoppableStep) {
changedOperationMode.set(true);
return null;
}).when(clusterService)
.submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"), any(OperationModeUpdateTask.class));
.submitStateUpdateTask(eq("ilm_operation_mode_update[stopped]"), eq(OperationModeUpdateTask.ilmMode(OperationMode.STOPPED)));
indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, true);
assertTrue(changedOperationMode.get());
Expand Down Expand Up @@ -345,8 +341,7 @@ public void testRequestedStopOnSafeAction() {
assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
moveToMaintenance.set(true);
return null;
}).when(clusterService)
.submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"), any(OperationModeUpdateTask.class));
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update[stopped]"), any(OperationModeUpdateTask.class));

indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
Expand All @@ -362,31 +357,6 @@ public void testExceptionStillProcessesOtherIndicesOnMaster() {
doTestExceptionStillProcessesOtherIndices(true);
}

public void testOperationModeUpdateTaskPriority() {
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPING);
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPING, Priority.IMMEDIATE);
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPED);
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPED, Priority.IMMEDIATE);
indexLifecycleService.submitOperationModeUpdate(OperationMode.RUNNING);
verifyOperationModeUpdateTaskPriority(OperationMode.RUNNING, Priority.NORMAL);
}

private void verifyOperationModeUpdateTaskPriority(OperationMode mode, Priority expectedPriority) {
verify(clusterService).submitStateUpdateTask(
Mockito.eq("ilm_operation_mode_update {OperationMode " + mode.name() + "}"),
argThat(new ArgumentMatcher<OperationModeUpdateTask>() {

Priority actualPriority = null;

@Override
public boolean matches(OperationModeUpdateTask other) {
actualPriority = other.priority();
return actualPriority == expectedPriority;
}
})
);
}

@SuppressWarnings("unchecked")
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
String policy1 = randomAlphaOfLengthBetween(1, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testStopILMClusterStatePriorityIsImmediate() {
transportStopILMAction.masterOperation(task, request, ClusterState.EMPTY_STATE, EMPTY_LISTENER);

verify(clusterService).submitStateUpdateTask(
eq("ilm_operation_mode_update"),
eq("ilm_operation_mode_update[stopping]"),
argThat(new ArgumentMatcher<AckedClusterStateUpdateTask>() {

Priority actualPriority = null;
Expand Down
Loading

0 comments on commit 2b51efe

Please sign in to comment.