Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ILM] fix retry so it picks up latest policy and executes async action #35406

Merged
merged 4 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
import org.junit.Before;
Expand Down Expand Up @@ -176,6 +177,41 @@ public void testMoveToRolloverStep() throws Exception {
assertBusy(() -> assertFalse(indexExists(shrunkenOriginalIndex)));
}

public void testRetryFailedShrinkAction() throws Exception {
int numShards = 6;
int divisor = randomFrom(2, 3, 6);
int expectedFinalShards = numShards / divisor;
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
createNewSingletonPolicy("warm", new ShrinkAction(numShards + randomIntBetween(1, numShards)));
updatePolicy(index, policy);
assertBusy(() -> {
String failedStep = getFailedStepForIndex(index);
assertThat(failedStep, equalTo(ShrinkStep.NAME));
});

// update policy to be correct
createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards));
updatePolicy(index, policy);

// retry step
Request retryRequest = new Request("POST", index + "/_ilm/retry");
assertOK(client().performRequest(retryRequest));

// assert corrected policy is picked up and index is shrunken
assertBusy(() -> {
logger.error(explainIndex(index));
assertTrue(indexExists(shrunkenIndex));
assertTrue(aliasExists(shrunkenIndex, index));
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY));
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
});
expectThrows(ResponseException.class, this::indexDocument);
}

public void testRolloverAction() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] in
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
String failedStep = lifecycleState.getFailedStep();
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
&& Strings.isNullOrEmpty(failedStep) == false) {
&& Strings.isNullOrEmpty(failedStep) == false) {
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry);
} else {
Expand All @@ -369,7 +369,7 @@ private static LifecycleExecutionState moveExecutionStateToNextStep(LifecyclePol
updatedState.setFailedStep(null);
updatedState.setStepInfo(null);

if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
if (currentStep.getPhase().equals(nextStep.getPhase()) == false || ErrorStep.NAME.equals(currentStep.getName())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the right pace to do this. We only want this functionality to occur in the retry API so I think it should only be that code path that gets this phase definition refresh. Maybe we can have a boolean forcePhaseDefinitionRefresh parameter and use that to override the behaviour for the retry method. I can also see a case in the future where we might want to have an option on the moveToStep API to force the phase definition to be updated too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. this was my issue with this same thing. I thought about creating the boolean flag, but I find flags like this that significantly change behavior to be confusing. I didn't think about move-to-step potentially wanting the same thing, though. Given this potential usage outside of just errors, I think the flag would be appropriate as to not duplicate much of the code. thanks. I will try it and see how it reads

final String newPhaseDefinition;
final Phase nextPhase;
if ("new".equals(nextStep.getPhase()) || TerminalPolicyStep.KEY.equals(nextStep)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
Expand Down Expand Up @@ -55,6 +58,23 @@ public ClusterState execute(ClusterState currentState) {
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
for (String index : request.indices()) {
IndexMetaData idxMeta = newState.metaData().index(index);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey retryStep = new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(), lifecycleState.getStep());
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug("index [" + index + "] has been deleted after moving to step [" +
lifecycleState.getStep() + "], skipping async action check");
return;
}
logger.error("TRYING TRYING");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this log statement got left in accidentally.

indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
}
}

@Override
protected Response newResponse(boolean acknowledged) {
return new Response(acknowledged);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,25 +866,68 @@ public void testMoveClusterStateToFailedStep() {
String[] indices = new String[] { indexName };
String policyName = "my_policy";
long now = randomNonNegativeLong();
StepKey failedStepKey = new StepKey("current_phase", "current_action", "current_step");
StepKey failedStepKey = new StepKey("current_phase", MockAction.NAME, "current_step");
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
Step step = new MockStep(failedStepKey, null);
LifecyclePolicy policy = createPolicy(policyName, failedStepKey, null);
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(),
randomNonNegativeLong(), randomNonNegativeLong());

PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step, indexName);
Settings.Builder indexSettingsBuilder = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, policyName);
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
lifecycleState.setPhase(errorStepKey.getPhase());
lifecycleState.setPhaseTime(now);
lifecycleState.setAction(errorStepKey.getAction());
lifecycleState.setActionTime(now);
lifecycleState.setStep(errorStepKey.getName());
lifecycleState.setStepTime(now);
lifecycleState.setFailedStep(failedStepKey.getName());
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(),
Collections.singletonList(policyMetadata));
Index index = clusterState.metaData().index(indexName).getIndex();
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
ClusterState nextClusterState = runner.moveClusterStateToFailedStep(clusterState, indices);
IndexLifecycleRunnerTests.assertClusterStateOnNextStep(clusterState, index, errorStepKey, failedStepKey,
nextClusterState, now);
}

public void testMoveClusterStateToFailedStepWithUnknownStep() {
String indexName = "my_index";
String[] indices = new String[] { indexName };
String policyName = "my_policy";
long now = randomNonNegativeLong();
StepKey failedStepKey = new StepKey("current_phase", MockAction.NAME, "current_step");
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);

StepKey registeredStepKey = new StepKey(randomFrom(failedStepKey.getPhase(), "other"),
MockAction.NAME, "different_step");
Step step = new MockStep(registeredStepKey, null);
LifecyclePolicy policy = createPolicy(policyName, failedStepKey, null);
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(),
randomNonNegativeLong(), randomNonNegativeLong());

PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step, indexName);
Settings.Builder indexSettingsBuilder = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, policyName);
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
lifecycleState.setPhase(errorStepKey.getPhase());
lifecycleState.setPhaseTime(now);
lifecycleState.setAction(errorStepKey.getAction());
lifecycleState.setActionTime(now);
lifecycleState.setStep(errorStepKey.getName());
lifecycleState.setStepTime(now);
lifecycleState.setFailedStep(failedStepKey.getName());
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(),
Collections.singletonList(policyMetadata));
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
assertThat(exception.getMessage(), equalTo("step [" + failedStepKey
+ "] for index [my_index] with policy [my_policy] does not exist"));
}

public void testMoveClusterStateToFailedStepIndexNotFound() {
String existingIndexName = "my_index";
String invalidIndexName = "does_not_exist";
Expand Down