Skip to content

Commit

Permalink
Skip Inspecting Busy Indices on ILM CS Application (#78471) (#78496)
Browse files Browse the repository at this point in the history
If the current combination of current-step and index has a running CS update task
enqueued there is no point in adding yet another task for this combination on the applier
and we can skip the expensive inspection for the index.

follow up to #78390
  • Loading branch information
original-brownbear authored Sep 30, 2021
1 parent 43dd3a8 commit fcc6a9a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(ExecuteStepsUpdateTask.class);
private final String policy;
private final Index index;
private final Step startStep;
private final PolicyStepsRegistry policyStepsRegistry;
private final IndexLifecycleRunner lifecycleRunner;
Expand All @@ -40,8 +39,8 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask

public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry,
IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) {
super(index, startStep.getKey());
this.policy = policy;
this.index = index;
this.startStep = startStep;
this.policyStepsRegistry = policyStepsRegistry;
this.nowSupplier = nowSupplier;
Expand All @@ -52,10 +51,6 @@ String getPolicy() {
return policy;
}

Index getIndex() {
return index;
}

Step getStartStep() {
return startStep;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.Step;

/**
* Base class for index lifecycle cluster state update tasks that requires implementing {@code equals} and {@code hashCode} to allow
Expand All @@ -20,6 +22,23 @@ public abstract class IndexLifecycleClusterStateUpdateTask extends ClusterStateU

private final ListenableFuture<Void> listener = new ListenableFuture<>();

protected final Index index;

protected final Step.StepKey currentStepKey;

protected IndexLifecycleClusterStateUpdateTask(Index index, Step.StepKey currentStepKey) {
this.index = index;
this.currentStepKey = currentStepKey;
}

final Index getIndex() {
return index;
}

final Step.StepKey getCurrentStepKey() {
return currentStepKey;
}

@Override
public final void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ilm.AsyncActionStep;
Expand Down Expand Up @@ -331,6 +332,11 @@ public void onFailure(Exception e) {
void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
String index = indexMetadata.getIndex().getName();
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
final StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState);
if (busyIndices.contains(Tuple.tuple(indexMetadata.getIndex(), currentStepKey))) {
// try later again, already doing work for this index at this step, no need to check for more work yet
return;
}
final Step currentStep;
try {
currentStep = getCurrentStep(stepRegistry, policy, indexMetadata, lifecycleState);
Expand All @@ -343,7 +349,6 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
markPolicyDoesNotExist(policy, indexMetadata.getIndex(), lifecycleState);
return;
} else {
Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState);
if (TerminalPolicyStep.KEY.equals(currentStepKey)) {
// This index is a leftover from before we halted execution on the final phase
// instead of going to the completed phase, so it's okay to ignore this index
Expand Down Expand Up @@ -511,6 +516,12 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {

private final Set<IndexLifecycleClusterStateUpdateTask> executingTasks = Collections.synchronizedSet(new HashSet<>());

/**
* Set of all index and current step key combinations that have an in-flight cluster state update at the moment. Used to not inspect
* indices that are already executing an update at their current step on cluster state update thread needlessly.
*/
private final Set<Tuple<Index, StepKey>> busyIndices = Collections.synchronizedSet(new HashSet<>());

/**
* Tracks already executing {@link IndexLifecycleClusterStateUpdateTask} tasks in {@link #executingTasks} to prevent queueing up
* duplicate cluster state updates.
Expand All @@ -522,8 +533,12 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
*/
private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterStateUpdateTask task) {
if (executingTasks.add(task)) {
final Tuple<Index, StepKey> dedupKey = Tuple.tuple(task.index, task.currentStepKey);
// index+step-key combination on a best-effort basis to skip checking for more work for an index on CS application
busyIndices.add(dedupKey);
task.addListener(ActionListener.wrap(() -> {
final boolean removed = executingTasks.remove(task);
busyIndices.remove(dedupKey);
assert removed : "tried to unregister unknown task [" + task + "]";
}));
clusterService.submitStateUpdateTask(source, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(MoveToNextStepUpdateTask.class);

private final Index index;
private final String policy;
private final Step.StepKey currentStepKey;
private final Step.StepKey nextStepKey;
private final LongSupplier nowSupplier;
private final PolicyStepsRegistry stepRegistry;
Expand All @@ -35,9 +33,8 @@ public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTa
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey,
LongSupplier nowSupplier, PolicyStepsRegistry stepRegistry,
Consumer<ClusterState> stateChangeConsumer) {
this.index = index;
super(index, currentStepKey);
this.policy = policy;
this.currentStepKey = currentStepKey;
this.nextStepKey = nextStepKey;
this.nowSupplier = nowSupplier;
this.stepRegistry = stepRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,19 @@ public class SetStepInfoUpdateTask extends IndexLifecycleClusterStateUpdateTask

private static final Logger logger = LogManager.getLogger(SetStepInfoUpdateTask.class);

private final Index index;
private final String policy;
private final Step.StepKey currentStepKey;
private final ToXContentObject stepInfo;

public SetStepInfoUpdateTask(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) {
this.index = index;
super(index, currentStepKey);
this.policy = policy;
this.currentStepKey = currentStepKey;
this.stepInfo = stepInfo;
}

Index getIndex() {
return index;
}

String getPolicy() {
return policy;
}

Step.StepKey getCurrentStepKey() {
return currentStepKey;
}

ToXContentObject getStepInfo() {
return stepInfo;
}
Expand Down

0 comments on commit fcc6a9a

Please sign in to comment.