Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ILM] fix retry so it picks up latest policy and executes async action #35406

Merged
merged 4 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
import org.junit.Before;
Expand Down Expand Up @@ -178,6 +179,41 @@ public void testMoveToRolloverStep() throws Exception {
assertBusy(() -> assertFalse(indexExists(shrunkenOriginalIndex)));
}

public void testRetryFailedShrinkAction() throws Exception {
int numShards = 6;
int divisor = randomFrom(2, 3, 6);
int expectedFinalShards = numShards / divisor;
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
createNewSingletonPolicy("warm", new ShrinkAction(numShards + randomIntBetween(1, numShards)));
updatePolicy(index, policy);
assertBusy(() -> {
String failedStep = getFailedStepForIndex(index);
assertThat(failedStep, equalTo(ShrinkStep.NAME));
});

// update policy to be correct
createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards));
updatePolicy(index, policy);

// retry step
Request retryRequest = new Request("POST", index + "/_ilm/retry");
assertOK(client().performRequest(retryRequest));

// assert corrected policy is picked up and index is shrunken
assertBusy(() -> {
logger.error(explainIndex(index));
assertTrue(indexExists(shrunkenIndex));
assertTrue(aliasExists(shrunkenIndex, index));
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY));
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
});
expectThrows(ResponseException.class, this::indexDocument);
}

public void testRolloverAction() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ClusterState execute(final ClusterState currentState) throws IOException
return state;
} else {
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
currentStep.getNextStepKey(), nowSupplier);
currentStep.getNextStepKey(), nowSupplier, false);
}
} else {
// cluster state wait step so evaluate the
Expand All @@ -125,7 +125,7 @@ public ClusterState execute(final ClusterState currentState) throws IOException
return state;
} else {
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
currentStep.getNextStepKey(), nowSupplier);
currentStep.getNextStepKey(), nowSupplier, false);
}
} else {
logger.trace("[{}] condition not met ({}) [{}], returning existing state",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,13 @@ static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Inde
* @param nextStepKey The next step to move the index into
* @param nowSupplier The current-time supplier for updating when steps changed
* @param stepRegistry The steps registry to check a step-key's existence in the index's current policy
* @param forcePhaseDefinitionRefresh When true, step information will be recompiled from the latest version of the
* policy. Otherwise, existing phase definition is used.
* @return The updated cluster state where the index moved to <code>nextStepKey</code>
*/
static ClusterState moveClusterStateToStep(String indexName, ClusterState currentState, StepKey currentStepKey,
StepKey nextStepKey, LongSupplier nowSupplier,
PolicyStepsRegistry stepRegistry) {
PolicyStepsRegistry stepRegistry, boolean forcePhaseDefinitionRefresh) {
IndexMetaData idxMeta = currentState.getMetaData().index(indexName);
Settings indexSettings = idxMeta.getSettings();
String indexPolicySetting = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
Expand All @@ -295,18 +297,19 @@ static ClusterState moveClusterStateToStep(String indexName, ClusterState curren
"] with policy [" + indexPolicySetting + "] does not exist");
}

return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey, nextStepKey, nowSupplier);
return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey,
nextStepKey, nowSupplier, forcePhaseDefinitionRefresh);
}

static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
LongSupplier nowSupplier) {
LongSupplier nowSupplier, boolean forcePhaseDefinitionRefresh) {
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
LifecyclePolicyMetadata policyMetadata = ilmMeta.getPolicyMetadatas()
.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
LifecycleExecutionState newLifecycleState = moveExecutionStateToNextStep(policyMetadata,
lifecycleState, currentStep, nextStep, nowSupplier);
lifecycleState, currentStep, nextStep, nowSupplier, forcePhaseDefinitionRefresh);
ClusterState.Builder newClusterStateBuilder = newClusterStateWithLifecycleState(index, clusterState, newLifecycleState);

return newClusterStateBuilder.build();
Expand All @@ -324,7 +327,7 @@ static ClusterState moveClusterStateToErrorStep(Index index, ClusterState cluste
causeXContentBuilder.endObject();
LifecycleExecutionState nextStepState = moveExecutionStateToNextStep(policyMetadata,
LifecycleExecutionState.fromIndexMetadata(idxMeta), currentStep, new StepKey(currentStep.getPhase(),
currentStep.getAction(), ErrorStep.NAME), nowSupplier);
currentStep.getAction(), ErrorStep.NAME), nowSupplier, false);
LifecycleExecutionState.Builder failedState = LifecycleExecutionState.builder(nextStepState);
failedState.setFailedStep(currentStep.getName());
failedState.setStepInfo(BytesReference.bytes(causeXContentBuilder).utf8ToString());
Expand All @@ -343,9 +346,9 @@ ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] in
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
String failedStep = lifecycleState.getFailedStep();
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
&& Strings.isNullOrEmpty(failedStep) == false) {
&& Strings.isNullOrEmpty(failedStep) == false) {
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry);
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry, true);
} else {
throw new IllegalArgumentException("cannot retry an action for an index ["
+ index + "] that has not encountered an error when running a Lifecycle Policy");
Expand All @@ -357,7 +360,8 @@ ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] in
private static LifecycleExecutionState moveExecutionStateToNextStep(LifecyclePolicyMetadata policyMetadata,
LifecycleExecutionState existingState,
StepKey currentStep, StepKey nextStep,
LongSupplier nowSupplier) {
LongSupplier nowSupplier,
boolean forcePhaseDefinitionRefresh) {
long nowAsMillis = nowSupplier.getAsLong();
LifecycleExecutionState.Builder updatedState = LifecycleExecutionState.builder(existingState);
updatedState.setPhase(nextStep.getPhase());
Expand All @@ -369,7 +373,7 @@ private static LifecycleExecutionState moveExecutionStateToNextStep(LifecyclePol
updatedState.setFailedStep(null);
updatedState.setStepInfo(null);

if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
if (currentStep.getPhase().equals(nextStep.getPhase()) == false || forcePhaseDefinitionRefresh) {
final String newPhaseDefinition;
final Phase nextPhase;
if ("new".equals(nextStep.getPhase()) || TerminalPolicyStep.KEY.equals(nextStep)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void maybeRunAsyncAction(ClusterState clusterState, IndexMetaData indexMe

public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
nowSupplier, policyRegistry);
nowSupplier, policyRegistry, false);
}

public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ClusterState execute(ClusterState currentState) {
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexILMData))) {
logger.trace("moving [{}] to next step ({})", index.getName(), nextStepKey);
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier);
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, false);
} else {
// either the policy has changed or the step is now
// not the same as when we submitted the update task. In
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
Expand Down Expand Up @@ -55,6 +58,22 @@ public ClusterState execute(ClusterState currentState) {
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
for (String index : request.indices()) {
IndexMetaData idxMeta = newState.metaData().index(index);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey retryStep = new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(), lifecycleState.getStep());
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug("index [" + index + "] has been deleted after moving to step [" +
lifecycleState.getStep() + "], skipping async action check");
return;
}
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
}
}

@Override
protected Response newResponse(boolean acknowledged) {
return new Response(acknowledged);
Expand Down
Loading