diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java index 9890ee65ddc3c..033015b48c5ea 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java @@ -30,7 +30,6 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask { private static final Logger logger = LogManager.getLogger(ExecuteStepsUpdateTask.class); private final String policy; - private final Index index; private final Step startStep; private final PolicyStepsRegistry policyStepsRegistry; private final IndexLifecycleRunner lifecycleRunner; @@ -40,8 +39,8 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { + super(index, startStep.getKey()); this.policy = policy; - this.index = index; this.startStep = startStep; this.policyStepsRegistry = policyStepsRegistry; this.nowSupplier = nowSupplier; @@ -52,10 +51,6 @@ String getPolicy() { return policy; } - Index getIndex() { - return index; - } - Step getStartStep() { return startStep; } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java index a3b5622a5670c..419b76f58727a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java @@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.ilm.Step; /** * Base class for index lifecycle cluster state update tasks that requires implementing {@code equals} and {@code hashCode} to allow @@ -20,6 +22,23 @@ public abstract class IndexLifecycleClusterStateUpdateTask extends ClusterStateU private final ListenableFuture listener = new ListenableFuture<>(); + protected final Index index; + + protected final Step.StepKey currentStepKey; + + protected IndexLifecycleClusterStateUpdateTask(Index index, Step.StepKey currentStepKey) { + this.index = index; + this.currentStepKey = currentStepKey; + } + + final Index getIndex() { + return index; + } + + final Step.StepKey getCurrentStepKey() { + return currentStepKey; + } + @Override public final void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse(null); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index d82437691b916..f1cf2f0c0df1d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ilm.AsyncActionStep; @@ -331,6 +332,11 @@ public void onFailure(Exception e) { void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) { String index = indexMetadata.getIndex().getName(); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetadata); + final StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState); + if (busyIndices.contains(Tuple.tuple(indexMetadata.getIndex(), currentStepKey))) { + // try later again, already doing work for this index at this step, no need to check for more work yet + return; + } final Step currentStep; try { currentStep = getCurrentStep(stepRegistry, policy, indexMetadata, lifecycleState); @@ -343,7 +349,6 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) { markPolicyDoesNotExist(policy, indexMetadata.getIndex(), lifecycleState); return; } else { - Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState); if (TerminalPolicyStep.KEY.equals(currentStepKey)) { // This index is a leftover from before we halted execution on the final phase // instead of going to the completed phase, so it's okay to ignore this index @@ -511,6 +516,12 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) { private final Set executingTasks = Collections.synchronizedSet(new HashSet<>()); + /** + * Set of all index and current step key combinations that have an in-flight cluster state update at the moment. Used to not inspect + * indices that are already executing an update at their current step on cluster state update thread needlessly. + */ + private final Set> busyIndices = Collections.synchronizedSet(new HashSet<>()); + /** * Tracks already executing {@link IndexLifecycleClusterStateUpdateTask} tasks in {@link #executingTasks} to prevent queueing up * duplicate cluster state updates. @@ -522,8 +533,12 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) { */ private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterStateUpdateTask task) { if (executingTasks.add(task)) { + final Tuple dedupKey = Tuple.tuple(task.index, task.currentStepKey); + // index+step-key combination on a best-effort basis to skip checking for more work for an index on CS application + busyIndices.add(dedupKey); task.addListener(ActionListener.wrap(() -> { final boolean removed = executingTasks.remove(task); + busyIndices.remove(dedupKey); assert removed : "tried to unregister unknown task [" + task + "]"; })); clusterService.submitStateUpdateTask(source, task); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java index 03efd7b2c3150..0fe21a683648c 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java @@ -24,9 +24,7 @@ public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTask { private static final Logger logger = LogManager.getLogger(MoveToNextStepUpdateTask.class); - private final Index index; private final String policy; - private final Step.StepKey currentStepKey; private final Step.StepKey nextStepKey; private final LongSupplier nowSupplier; private final PolicyStepsRegistry stepRegistry; @@ -35,9 +33,8 @@ public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTa public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey, LongSupplier nowSupplier, PolicyStepsRegistry stepRegistry, Consumer stateChangeConsumer) { - this.index = index; + super(index, currentStepKey); this.policy = policy; - this.currentStepKey = currentStepKey; this.nextStepKey = nextStepKey; this.nowSupplier = nowSupplier; this.stepRegistry = stepRegistry; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java index 5d92d3a1038be..1de4cbc39e9a1 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java @@ -28,30 +28,19 @@ public class SetStepInfoUpdateTask extends IndexLifecycleClusterStateUpdateTask private static final Logger logger = LogManager.getLogger(SetStepInfoUpdateTask.class); - private final Index index; private final String policy; - private final Step.StepKey currentStepKey; private final ToXContentObject stepInfo; public SetStepInfoUpdateTask(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) { - this.index = index; + super(index, currentStepKey); this.policy = policy; - this.currentStepKey = currentStepKey; this.stepInfo = stepInfo; } - Index getIndex() { - return index; - } - String getPolicy() { return policy; } - Step.StepKey getCurrentStepKey() { - return currentStepKey; - } - ToXContentObject getStepInfo() { return stepInfo; }