Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ILM to transition to implicit cached steps #91779

Merged
merged 12 commits into from
Mar 20, 2023
6 changes: 6 additions & 0 deletions docs/changelog/91779.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91779
summary: Allow ILM to transition to implicit cached steps
area: ILM+SLM
type: bug
issues:
- 91749
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public static boolean isIndexPhaseDefinitionUpdatable(
* information, returns null.
*/
@Nullable
public static Set<Step.StepKey> readStepKeys(
static Set<Step.StepKey> readStepKeys(
final NamedXContentRegistry xContentRegistry,
final Client client,
final String phaseDef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public static void validateTransition(
}

final Set<Step.StepKey> cachedStepKeys = stepRegistry.parseStepKeysFromPhase(
lifecycleState.phaseDefinition(),
lifecycleState.phase()
policyName,
lifecycleState.phase(),
lifecycleState.phaseDefinition()
);
boolean isNewStepCached = cachedStepKeys != null && cachedStepKeys.contains(newStepKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCacheManagement;
import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
Expand All @@ -42,12 +41,14 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class PolicyStepsRegistry {
private static final Logger logger = LogManager.getLogger(PolicyStepsRegistry.class);
Expand Down Expand Up @@ -245,11 +246,31 @@ public Step.StepKey getFirstStepForPhaseAndAction(ClusterState state, Index inde

/*
* Parses the step keys from the {@code phaseDef} for the given phase.
* ILM makes use of some implicit steps that belong to actions that we automatically inject
* (eg. unfollow and migrate) or special purpose steps like the phase `complete` step.
*
* This method returns **all** the steps that are part of the phase definition including the implicit steps.
*
* Returns null if there's a parsing error.
*/
@Nullable
public Set<Step.StepKey> parseStepKeysFromPhase(String phaseDef, String currentPhase) {
return PhaseCacheManagement.readStepKeys(xContentRegistry, client, phaseDef, currentPhase, licenseState);
public Set<Step.StepKey> parseStepKeysFromPhase(String policy, String currentPhase, String phaseDef) {
try {
String phaseDefNonNull = Objects.requireNonNullElse(phaseDef, InitializePolicyContextStep.INITIALIZATION_PHASE);
return parseStepsFromPhase(policy, currentPhase, phaseDefNonNull).stream().map(Step::getKey).collect(Collectors.toSet());
} catch (IOException e) {
logger.trace(
() -> String.format(
Locale.ROOT,
"unable to parse steps for policy [{}], phase [{}], and phase definition [{}]",
policy,
currentPhase,
phaseDef
),
e
);
return null;
}
}

private List<Step> parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException {
Expand Down Expand Up @@ -341,8 +362,10 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe
}

// parse phase steps from the phase definition in the index settings
final String phaseJson = Optional.ofNullable(indexMetadata.getLifecycleExecutionState().phaseDefinition())
.orElse(InitializePolicyContextStep.INITIALIZATION_PHASE);
final String phaseJson = Objects.requireNonNullElse(
indexMetadata.getLifecycleExecutionState().phaseDefinition(),
InitializePolicyContextStep.INITIALIZATION_PHASE
);

final List<Step> phaseSteps;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,66 @@ public void testValidateTransitionToCachedStepWhenMissingPhaseFromPolicy() {
}
}

public void testValidateTransitionToInjectedMissingStep() {
// we'll test the case when the warm phase was deleted and the next step is an injected one

LifecycleExecutionState.Builder executionState = LifecycleExecutionState.builder()
.setPhase("warm")
.setAction("migrate")
.setStep("migrate")
.setPhaseDefinition("""
{
"policy" : "my-policy",
"phase_definition" : {
"min_age" : "20m",
"actions" : {
"set_priority" : {
"priority" : 150
}
}
},
"version" : 1,
"modified_date_in_millis" : 1578521007076
}""");

IndexMetadata meta = buildIndexMetadata("my-policy", executionState);

try (Client client = new NoOpClient(getTestName())) {
Step.StepKey currentStepKey = new Step.StepKey("warm", MigrateAction.NAME, MigrateAction.NAME);
Step.StepKey nextStepKey = new Step.StepKey("warm", MigrateAction.NAME, DataTierMigrationRoutedStep.NAME);

Step.StepKey waitForRolloverStepKey = new Step.StepKey("hot", RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
Step.StepKey rolloverStepKey = new Step.StepKey("hot", RolloverAction.NAME, RolloverStep.NAME);
Step waitForRolloverReadyStep = new WaitForRolloverReadyStep(
waitForRolloverStepKey,
rolloverStepKey,
client,
null,
null,
null,
1L,
null,
null,
null,
null,
null,
null
);

try {
IndexLifecycleTransition.validateTransition(
meta,
currentStepKey,
nextStepKey,
createOneStepPolicyStepRegistry("my-policy", waitForRolloverReadyStep)
);
} catch (Exception e) {
logger.error(e.getMessage(), e);
fail("validateTransition should not throw exception on valid transitions");
}
}
}

public void testMoveClusterStateToFailedStep() {
String indexName = "my_index";
String policyName = "my_policy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -27,20 +28,34 @@
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MoveToNextStepUpdateTaskTests extends ESTestCase {

private static final NamedXContentRegistry REGISTRY;

String policy;
ClusterState clusterState;
Index index;
LifecyclePolicy lifecyclePolicy;

static {
try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) {
List<NamedXContentRegistry.Entry> entries = new ArrayList<>(indexLifecycle.getNamedXContent());
REGISTRY = new NamedXContentRegistry(entries);
}
}

@Before
public void setupClusterState() {
policy = randomAlphaOfLength(10);
Expand Down Expand Up @@ -75,13 +90,22 @@ public void testExecuteSuccessfullyMoved() throws Exception {
setStateToKey(currentStepKey, now);

AtomicBoolean changed = new AtomicBoolean(false);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
AlwaysExistingStepRegistry stepRegistry = new AlwaysExistingStepRegistry(client);
stepRegistry.update(
new IndexLifecycleMetadata(
Map.of(policy, new LifecyclePolicyMetadata(lifecyclePolicy, Collections.emptyMap(), 2L, 2L)),
OperationMode.RUNNING
)
);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(
index,
policy,
currentStepKey,
nextStepKey,
() -> now,
new AlwaysExistingStepRegistry(),
stepRegistry,
state -> changed.set(true)
);
ClusterState newState = task.execute(clusterState);
Expand Down Expand Up @@ -140,13 +164,22 @@ public void testExecuteSuccessfulMoveWithInvalidNextStep() throws Exception {
setStateToKey(currentStepKey, now);

SetOnce<Boolean> changed = new SetOnce<>();
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
AlwaysExistingStepRegistry stepRegistry = new AlwaysExistingStepRegistry(client);
stepRegistry.update(
new IndexLifecycleMetadata(
Map.of(policy, new LifecyclePolicyMetadata(lifecyclePolicy, Collections.emptyMap(), 2L, 2L)),
OperationMode.RUNNING
)
);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(
index,
policy,
currentStepKey,
invalidNextStep,
() -> now,
new AlwaysExistingStepRegistry(),
stepRegistry,
s -> changed.set(true)
);
ClusterState newState = task.execute(clusterState);
Expand Down Expand Up @@ -186,7 +219,11 @@ public void testOnFailure() {
private static class AlwaysExistingStepRegistry extends PolicyStepsRegistry {

AlwaysExistingStepRegistry() {
super(new NamedXContentRegistry(Collections.emptyList()), null, null);
this(null);
}

AlwaysExistingStepRegistry(Client client) {
super(REGISTRY, client, null);
}

@Override
Expand Down Expand Up @@ -215,7 +252,18 @@ private void setStateToKey(StepKey stepKey, long now) {
lifecycleState.setActionTime(now);
lifecycleState.setStep(stepKey.name());
lifecycleState.setStepTime(now);
lifecycleState.setPhaseDefinition("{\"actions\":{\"TEST_ACTION\":{}}}");

lifecycleState.setPhaseDefinition(String.format(Locale.ROOT, """
{
"policy" : "%s",
"phase_definition" : {
"min_age" : "20m",
"actions" : {
}
},
"version" : 1,
"modified_date_in_millis" : 1578521007076
}""", policy));
clusterState = ClusterState.builder(clusterState)
.metadata(
Metadata.builder(clusterState.getMetadata())
Expand Down