diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 9d5c19fcb0af7..5aca0fe24e2bc 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle.State; import org.elasticsearch.common.settings.Settings; @@ -237,7 +236,10 @@ void onMaster(ClusterState clusterState) { } if (safeToStop && OperationMode.STOPPING == currentMode) { - submitOperationModeUpdate(OperationMode.STOPPED); + clusterService.submitStateUpdateTask( + "ilm_operation_mode_update[stopped]", + OperationModeUpdateTask.ilmMode(OperationMode.STOPPED) + ); } } } @@ -437,7 +439,10 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) } if (safeToStop && OperationMode.STOPPING == currentMode) { - submitOperationModeUpdate(OperationMode.STOPPED); + clusterService.submitStateUpdateTask( + "ilm_operation_mode_update[stopped]", + OperationModeUpdateTask.ilmMode(OperationMode.STOPPED) + ); } } @@ -453,16 +458,6 @@ assert isClusterServiceStoppedOrClosed() } } - public void submitOperationModeUpdate(OperationMode mode) { - OperationModeUpdateTask ilmOperationModeUpdateTask; - if (mode == OperationMode.STOPPING || mode == OperationMode.STOPPED) { - ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.IMMEDIATE, mode); - } else { - ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.NORMAL, mode); - } - clusterService.submitStateUpdateTask("ilm_operation_mode_update {OperationMode " + mode.name() + "}", ilmOperationModeUpdateTask); - } - /** * Method that checks if the lifecycle state of the cluster service is stopped or closed. This * enhances the readability of the code. diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java index e21b13b1588e3..519517a1cdb39 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java @@ -8,8 +8,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Priority; import org.elasticsearch.core.Nullable; @@ -17,6 +21,8 @@ import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata; +import java.util.Objects; + /** * This task updates the operation mode state for ILM. * @@ -30,6 +36,19 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask { @Nullable private final OperationMode slmMode; + public static AckedClusterStateUpdateTask wrap( + OperationModeUpdateTask task, + AckedRequest request, + ActionListener listener + ) { + return new AckedClusterStateUpdateTask(task.priority(), request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return task.execute(currentState); + } + }; + } + private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) { super(priority); this.ilmMode = ilmMode; @@ -37,15 +56,19 @@ private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, Operat } public static OperationModeUpdateTask ilmMode(OperationMode mode) { - return ilmMode(Priority.NORMAL, mode); + return new OperationModeUpdateTask(getPriority(mode), mode, null); } - public static OperationModeUpdateTask ilmMode(Priority priority, OperationMode mode) { - return new OperationModeUpdateTask(priority, mode, null); + public static OperationModeUpdateTask slmMode(OperationMode mode) { + return new OperationModeUpdateTask(getPriority(mode), null, mode); } - public static OperationModeUpdateTask slmMode(OperationMode mode) { - return new OperationModeUpdateTask(Priority.NORMAL, null, mode); + private static Priority getPriority(OperationMode mode) { + if (mode == OperationMode.STOPPED || mode == OperationMode.STOPPING) { + return Priority.IMMEDIATE; + } else { + return Priority.NORMAL; + } } OperationMode getILMOperationMode() { @@ -129,4 +152,33 @@ private ClusterState updateSLMState(final ClusterState currentState) { public void onFailure(String source, Exception e) { logger.error("unable to update lifecycle metadata with new ilm mode [" + ilmMode + "], slm mode [" + slmMode + "]", e); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (ilmMode != null) { + logger.info("ILM operation mode updated to {}", ilmMode); + } + if (slmMode != null) { + logger.info("SLM operation mode updated to {}", slmMode); + } + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), ilmMode, slmMode); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + OperationModeUpdateTask other = (OperationModeUpdateTask) obj; + return Objects.equals(priority(), other.priority()) + && Objects.equals(ilmMode, other.ilmMode) + && Objects.equals(slmMode, other.slmMode); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java index 6e947ac486517..d5f8254f1f78e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -50,12 +49,10 @@ public TransportStartILMAction( @Override protected void masterOperation(Task task, StartILMRequest request, ClusterState state, ActionListener listener) { - clusterService.submitStateUpdateTask("ilm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.ilmMode(OperationMode.RUNNING)).execute(currentState); - } - }); + clusterService.submitStateUpdateTask( + "ilm_operation_mode_update[running]", + OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(OperationMode.RUNNING), request, listener) + ); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java index 41c70e47e276b..00eb4176a8402 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java @@ -11,13 +11,11 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -52,13 +50,8 @@ public TransportStopILMAction( @Override protected void masterOperation(Task task, StopILMRequest request, ClusterState state, ActionListener listener) { clusterService.submitStateUpdateTask( - "ilm_operation_mode_update", - new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState); - } - } + "ilm_operation_mode_update[stopping]", + OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(OperationMode.STOPPING), request, listener) ); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java index 7068993999958..3bae41bfa7ef7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java @@ -95,7 +95,10 @@ public void clusterChanged(final ClusterChangedEvent event) { cancelSnapshotJobs(); } if (slmStopping(state)) { - submitOperationModeUpdate(OperationMode.STOPPED); + clusterService.submitStateUpdateTask( + "slm_operation_mode_update[stopped]", + OperationModeUpdateTask.slmMode(OperationMode.STOPPED) + ); } return; } @@ -130,10 +133,6 @@ static boolean slmStopping(ClusterState state) { .orElse(false); } - public void submitOperationModeUpdate(OperationMode mode) { - clusterService.submitStateUpdateTask("slm_operation_mode_update", OperationModeUpdateTask.slmMode(mode)); - } - /** * Schedule all non-scheduled snapshot jobs contained in the cluster state */ diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java index 4a1d9c7ffcabc..d80c4290d4a79 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -54,12 +53,10 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - clusterService.submitStateUpdateTask("slm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.slmMode(OperationMode.RUNNING)).execute(currentState); - } - }); + clusterService.submitStateUpdateTask( + "slm_operation_mode_update[running]", + OperationModeUpdateTask.wrap(OperationModeUpdateTask.slmMode(OperationMode.RUNNING), request, listener) + ); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java index c2ca65bc784fc..8d84f04fd12b1 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -54,12 +53,10 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - clusterService.submitStateUpdateTask("slm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.slmMode(OperationMode.STOPPING)).execute(currentState); - } - }); + clusterService.submitStateUpdateTask( + "slm_operation_mode_update[stopping]", + OperationModeUpdateTask.wrap(OperationModeUpdateTask.slmMode(OperationMode.STOPPING), request, listener) + ); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index c25cb0039b9d4..b6c6a4255737d 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.Lifecycle.State; import org.elasticsearch.common.settings.ClusterSettings; @@ -52,7 +51,6 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.junit.After; import org.junit.Before; -import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import java.time.Clock; @@ -74,11 +72,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class IndexLifecycleServiceTests extends ESTestCase { @@ -287,7 +283,7 @@ private void verifyCanStopWithStep(String stoppableStep) { changedOperationMode.set(true); return null; }).when(clusterService) - .submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"), any(OperationModeUpdateTask.class)); + .submitStateUpdateTask(eq("ilm_operation_mode_update[stopped]"), eq(OperationModeUpdateTask.ilmMode(OperationMode.STOPPED))); indexLifecycleService.applyClusterState(event); indexLifecycleService.triggerPolicies(currentState, true); assertTrue(changedOperationMode.get()); @@ -345,8 +341,7 @@ public void testRequestedStopOnSafeAction() { assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED)); moveToMaintenance.set(true); return null; - }).when(clusterService) - .submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"), any(OperationModeUpdateTask.class)); + }).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update[stopped]"), any(OperationModeUpdateTask.class)); indexLifecycleService.applyClusterState(event); indexLifecycleService.triggerPolicies(currentState, randomBoolean()); @@ -362,31 +357,6 @@ public void testExceptionStillProcessesOtherIndicesOnMaster() { doTestExceptionStillProcessesOtherIndices(true); } - public void testOperationModeUpdateTaskPriority() { - indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPING); - verifyOperationModeUpdateTaskPriority(OperationMode.STOPPING, Priority.IMMEDIATE); - indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPED); - verifyOperationModeUpdateTaskPriority(OperationMode.STOPPED, Priority.IMMEDIATE); - indexLifecycleService.submitOperationModeUpdate(OperationMode.RUNNING); - verifyOperationModeUpdateTaskPriority(OperationMode.RUNNING, Priority.NORMAL); - } - - private void verifyOperationModeUpdateTaskPriority(OperationMode mode, Priority expectedPriority) { - verify(clusterService).submitStateUpdateTask( - Mockito.eq("ilm_operation_mode_update {OperationMode " + mode.name() + "}"), - argThat(new ArgumentMatcher() { - - Priority actualPriority = null; - - @Override - public boolean matches(OperationModeUpdateTask other) { - actualPriority = other.priority(); - return actualPriority == expectedPriority; - } - }) - ); - } - @SuppressWarnings("unchecked") public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) { String policy1 = randomAlphaOfLengthBetween(1, 20); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java index bd269b4832aa1..9de257613bd24 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java @@ -66,7 +66,7 @@ public void testStopILMClusterStatePriorityIsImmediate() { transportStopILMAction.masterOperation(task, request, ClusterState.EMPTY_STATE, EMPTY_LISTENER); verify(clusterService).submitStateUpdateTask( - eq("ilm_operation_mode_update"), + eq("ilm_operation_mode_update[stopping]"), argThat(new ArgumentMatcher() { Priority actualPriority = null; diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index 1f6517af07ef9..4ff0ad3d0617e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -7,16 +7,25 @@ package org.elasticsearch.xpack.slm; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.TimeValue; @@ -33,10 +42,13 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecycleStats; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; +import org.elasticsearch.xpack.ilm.OperationModeUpdateTask; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -431,6 +443,49 @@ public void testValidateMinimumInterval() { SnapshotLifecycleService.validateMinimumInterval(createPolicy("foo-1", "0/30 0/1 * * * ?"), validationDisabledState); } + public void testStoppedPriority() { + ClockMock clock = new ClockMock(); + ThreadPool threadPool = new TestThreadPool("name"); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + new HashSet<>( + Arrays.asList( + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + ClusterService.USER_DEFINED_METADATA, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING + ) + ) + ); + final SetOnce task = new SetOnce<>(); + ClusterService fakeService = new ClusterService(Settings.EMPTY, clusterSettings, threadPool) { + @Override + public & ClusterStateTaskListener> void submitStateUpdateTask( + String source, + T updateTask + ) { + logger.info("--> got task: [source: {}]: {}", source, updateTask); + if (updateTask instanceof OperationModeUpdateTask) { + task.set((OperationModeUpdateTask) updateTask); + } + } + }; + + SnapshotLifecycleService service = new SnapshotLifecycleService( + Settings.EMPTY, + () -> new SnapshotLifecycleTask(null, null, null), + fakeService, + clock + ); + ClusterState state = createState( + new SnapshotLifecycleMetadata(Map.of(), OperationMode.STOPPING, new SnapshotLifecycleStats(0, 0, 0, 0, Map.of())), + true + ); + service.clusterChanged(new ClusterChangedEvent("blah", state, ClusterState.EMPTY_STATE)); + assertThat(task.get(), equalTo(OperationModeUpdateTask.slmMode(OperationMode.STOPPED))); + threadPool.shutdownNow(); + } + class FakeSnapshotTask extends SnapshotLifecycleTask { private final Consumer onTriggered; diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/action/TransportStopSLMActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/action/TransportStopSLMActionTests.java new file mode 100644 index 0000000000000..c759fa947fc03 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/action/TransportStopSLMActionTests.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.slm.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ilm.action.StopILMAction; +import org.elasticsearch.xpack.core.slm.action.StopSLMAction; +import org.mockito.ArgumentMatcher; + +import static java.util.Collections.emptyMap; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class TransportStopSLMActionTests extends ESTestCase { + + private static final ActionListener EMPTY_LISTENER = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + + } + + @Override + public void onFailure(Exception e) { + + } + }; + + public void testStopILMClusterStatePriorityIsImmediate() { + ClusterService clusterService = mock(ClusterService.class); + + TransportStopSLMAction transportStopSLMAction = new TransportStopSLMAction( + mock(TransportService.class), + clusterService, + mock(ThreadPool.class), + mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class) + ); + Task task = new Task( + randomLong(), + "transport", + StopILMAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + emptyMap() + ); + StopSLMAction.Request request = new StopSLMAction.Request(); + transportStopSLMAction.masterOperation(task, request, ClusterState.EMPTY_STATE, EMPTY_LISTENER); + + verify(clusterService).submitStateUpdateTask( + eq("slm_operation_mode_update[stopping]"), + argThat(new ArgumentMatcher() { + + Priority actualPriority = null; + + @Override + public boolean matches(AckedClusterStateUpdateTask other) { + actualPriority = other.priority(); + return actualPriority == Priority.IMMEDIATE; + } + }) + ); + } + +}