Skip to content

Commit

Permalink
Miscellaneous ILM cleanups (elastic#118488)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Dec 11, 2024
1 parent e1d83e9 commit 3a6071c
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;

import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentSLMMode;
Expand Down Expand Up @@ -143,7 +144,10 @@ private ClusterState updateSLMState(final ClusterState currentState) {

@Override
public void onFailure(Exception e) {
logger.error("unable to update lifecycle metadata with new ilm mode [" + ilmMode + "], slm mode [" + slmMode + "]", e);
logger.error(
() -> Strings.format("unable to update lifecycle metadata with new ilm mode [%s], slm mode [%s]", ilmMode, slmMode),
e
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,7 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
// Remove the frozen phase, we'll randomly re-add it later
.filter(pn -> TimeseriesLifecycleType.FROZEN_PHASE.equals(pn) == false)
.collect(Collectors.toList());
Map<String, Phase> phases = Maps.newMapWithExpectedSize(phaseNames.size());
Function<String, Set<String>> validActions = getPhaseToValidActions();
Function<String, LifecycleAction> randomAction = getNameToActionFunction();
// as what actions end up in the hot phase influence what actions are allowed in the subsequent phases we'll move the hot phase
// at the front of the phases to process (if it exists)
if (phaseNames.contains(TimeseriesLifecycleType.HOT_PHASE)) {
phaseNames.remove(TimeseriesLifecycleType.HOT_PHASE);
phaseNames.add(0, TimeseriesLifecycleType.HOT_PHASE);
}
boolean hotPhaseContainsSearchableSnap = false;
boolean coldPhaseContainsSearchableSnap = false;

// let's order the phases so we can reason about actions in a previous phase in order to generate a random *valid* policy
List<String> orderedPhases = new ArrayList<>(phaseNames.size());
for (String validPhase : TimeseriesLifecycleType.ORDERED_VALID_PHASES) {
Expand All @@ -170,6 +160,12 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
}
}

Map<String, Phase> phases = Maps.newMapWithExpectedSize(phaseNames.size());
Function<String, Set<String>> validActions = getPhaseToValidActions();
Function<String, LifecycleAction> randomAction = getNameToActionFunction();
boolean hotPhaseContainsSearchableSnap = false;
boolean coldPhaseContainsSearchableSnap = false;

TimeValue prev = null;
for (String phase : orderedPhases) {
TimeValue after = prev == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

public class MockAction implements LifecycleAction {
public static final String NAME = "TEST_ACTION";
private List<Step> steps;
private final List<Step> steps;

private static final ObjectParser<MockAction, Void> PARSER = new ObjectParser<>(NAME, MockAction::new);
private final boolean safe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -121,7 +120,7 @@ protected Clock getClock() {

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
return List.of(
LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING,
LifecycleSettings.LIFECYCLE_NAME_SETTING,
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
Expand Down Expand Up @@ -204,7 +203,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
}

private static List<NamedXContentRegistry.Entry> xContentEntries() {
return Arrays.asList(
return List.of(
// Custom Metadata
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
Expand Down Expand Up @@ -260,52 +259,38 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
List<RestHandler> handlers = new ArrayList<>();

handlers.addAll(
Arrays.asList(
// add ILM rest handlers
new RestPutLifecycleAction(),
new RestGetLifecycleAction(),
new RestDeleteLifecycleAction(),
new RestExplainLifecycleAction(),
new RestRemoveIndexLifecyclePolicyAction(),
new RestMoveToStepAction(),
new RestRetryAction(),
new RestStopAction(),
new RestStartILMAction(),
new RestGetStatusAction(),
new RestMigrateToDataTiersAction()
)
return List.of(
new RestPutLifecycleAction(),
new RestGetLifecycleAction(),
new RestDeleteLifecycleAction(),
new RestExplainLifecycleAction(),
new RestRemoveIndexLifecyclePolicyAction(),
new RestMoveToStepAction(),
new RestRetryAction(),
new RestStopAction(),
new RestStartILMAction(),
new RestGetStatusAction(),
new RestMigrateToDataTiersAction()
);
return handlers;
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
var ilmUsageAction = new ActionHandler<>(XPackUsageFeatureAction.INDEX_LIFECYCLE, IndexLifecycleUsageTransportAction.class);
var ilmInfoAction = new ActionHandler<>(XPackInfoFeatureAction.INDEX_LIFECYCLE, IndexLifecycleInfoTransportAction.class);
var migrateToDataTiersAction = new ActionHandler<>(MigrateToDataTiersAction.INSTANCE, TransportMigrateToDataTiersAction.class);
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
actions.add(ilmUsageAction);
actions.add(ilmInfoAction);
actions.add(migrateToDataTiersAction);
actions.addAll(
Arrays.asList(
// add ILM actions
new ActionHandler<>(ILMActions.PUT, TransportPutLifecycleAction.class),
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifecycleAction.class),
new ActionHandler<>(ExplainLifecycleAction.INSTANCE, TransportExplainLifecycleAction.class),
new ActionHandler<>(RemoveIndexLifecyclePolicyAction.INSTANCE, TransportRemoveIndexLifecyclePolicyAction.class),
new ActionHandler<>(ILMActions.MOVE_TO_STEP, TransportMoveToStepAction.class),
new ActionHandler<>(ILMActions.RETRY, TransportRetryAction.class),
new ActionHandler<>(ILMActions.START, TransportStartILMAction.class),
new ActionHandler<>(ILMActions.STOP, TransportStopILMAction.class),
new ActionHandler<>(GetStatusAction.INSTANCE, TransportGetStatusAction.class)
)
return List.of(
new ActionHandler<>(XPackUsageFeatureAction.INDEX_LIFECYCLE, IndexLifecycleUsageTransportAction.class),
new ActionHandler<>(XPackInfoFeatureAction.INDEX_LIFECYCLE, IndexLifecycleInfoTransportAction.class),
new ActionHandler<>(MigrateToDataTiersAction.INSTANCE, TransportMigrateToDataTiersAction.class),
new ActionHandler<>(ILMActions.PUT, TransportPutLifecycleAction.class),
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifecycleAction.class),
new ActionHandler<>(ExplainLifecycleAction.INSTANCE, TransportExplainLifecycleAction.class),
new ActionHandler<>(RemoveIndexLifecyclePolicyAction.INSTANCE, TransportRemoveIndexLifecyclePolicyAction.class),
new ActionHandler<>(ILMActions.MOVE_TO_STEP, TransportMoveToStepAction.class),
new ActionHandler<>(ILMActions.RETRY, TransportRetryAction.class),
new ActionHandler<>(ILMActions.START, TransportStartILMAction.class),
new ActionHandler<>(ILMActions.STOP, TransportStopILMAction.class),
new ActionHandler<>(GetStatusAction.INSTANCE, TransportGetStatusAction.class)
);
return actions;
}

List<ReservedClusterStateHandler<?>> reservedClusterStateHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
Expand All @@ -39,7 +40,6 @@

import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -290,13 +290,7 @@ void onErrorMaybeRetryFailedStep(String policy, StepKey currentStep, IndexMetada
// IndexLifecycleRunner#runPeriodicStep} run the policy will still be in the ERROR step, as we haven't been able
// to move it back into the failed step, so we'll try again
submitUnlessAlreadyQueued(
String.format(
Locale.ROOT,
"ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}",
policy,
index,
failedStep.getKey()
),
Strings.format("ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}", policy, index, failedStep.getKey()),
new MoveToRetryFailedStepUpdateTask(indexMetadata.getIndex(), policy, currentStep, failedStep)
);
} else {
Expand Down Expand Up @@ -444,7 +438,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
} else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey());
submitUnlessAlreadyQueued(
String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
Strings.format("ilm-execute-cluster-state-steps [%s]", currentStep),
new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier)
);
} else {
Expand All @@ -459,8 +453,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) {
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey);
submitUnlessAlreadyQueued(
String.format(
Locale.ROOT,
Strings.format(
"ilm-move-to-step {policy [%s], index [%s], currentStep [%s], nextStep [%s]}",
policy,
index.getName(),
Expand All @@ -486,13 +479,7 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
e
);
submitUnlessAlreadyQueued(
String.format(
Locale.ROOT,
"ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}",
policy,
index.getName(),
currentStepKey
),
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
registerFailedOperation(indexMetadata, e);
Expand All @@ -506,13 +493,7 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
*/
private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) {
submitUnlessAlreadyQueued(
String.format(
Locale.ROOT,
"ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}",
policy,
index.getName(),
currentStepKey
),
Strings.format("ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private void cancelJob() {
@Override
public void triggered(SchedulerEngine.Event event) {
if (event.jobName().equals(XPackField.INDEX_LIFECYCLE)) {
logger.trace("job triggered: " + event.jobName() + ", " + event.scheduledTime() + ", " + event.triggeredTime());
logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime());
triggerPolicies(clusterService.state(), false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -42,7 +43,6 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -269,9 +269,8 @@ public Set<Step.StepKey> parseStepKeysFromPhase(String policy, String currentPha
return parseStepsFromPhase(policy, currentPhase, phaseDefNonNull).stream().map(Step::getKey).collect(Collectors.toSet());
} catch (IOException e) {
logger.trace(
() -> String.format(
Locale.ROOT,
"unable to parse steps for policy [{}], phase [{}], and phase definition [{}]",
() -> Strings.format(
"unable to parse steps for policy [%s], phase [%s], and phase definition [%s]",
policy,
currentPhase,
phaseDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void onFailure(Exception e) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
rerouteService.reroute("cluster migrated to data tiers routing", Priority.NORMAL, new ActionListener<Void>() {
rerouteService.reroute("cluster migrated to data tiers routing", Priority.NORMAL, new ActionListener<>() {
@Override
public void onResponse(Void ignored) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ILMHistoryStore implements Closeable {

public static final String ILM_HISTORY_DATA_STREAM = "ilm-history-" + INDEX_TEMPLATE_VERSION;

private static int ILM_HISTORY_BULK_SIZE = StrictMath.toIntExact(
private static final int ILM_HISTORY_BULK_SIZE = StrictMath.toIntExact(
ByteSizeValue.parseBytesSizeValue(
System.getProperty("es.indices.lifecycle.history.bulk.size", "50MB"),
"es.indices.lifecycle.history.bulk.size"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class IndexLifecycleTransitionTests extends ESTestCase {
public void testMoveClusterStateToNextStep() {
String indexName = "my_index";
LifecyclePolicy policy = randomValueOtherThanMany(
p -> p.getPhases().size() == 0,
p -> p.getPhases().isEmpty(),
() -> LifecyclePolicyTests.randomTestLifecyclePolicy("policy")
);
Phase nextPhase = policy.getPhases()
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testMoveClusterStateToNextStep() {
public void testMoveClusterStateToNextStepSamePhase() {
String indexName = "my_index";
LifecyclePolicy policy = randomValueOtherThanMany(
p -> p.getPhases().size() == 0,
p -> p.getPhases().isEmpty(),
() -> LifecyclePolicyTests.randomTestLifecyclePolicy("policy")
);
List<LifecyclePolicyMetadata> policyMetadatas = Collections.singletonList(
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testMoveClusterStateToNextStepSamePhase() {
public void testMoveClusterStateToNextStepSameAction() {
String indexName = "my_index";
LifecyclePolicy policy = randomValueOtherThanMany(
p -> p.getPhases().size() == 0,
p -> p.getPhases().isEmpty(),
() -> LifecyclePolicyTests.randomTestLifecyclePolicy("policy")
);
List<LifecyclePolicyMetadata> policyMetadatas = Collections.singletonList(
Expand Down Expand Up @@ -228,7 +228,7 @@ public void testSuccessfulValidatedMoveClusterStateToNextStep() {
String indexName = "my_index";
String policyName = "my_policy";
LifecyclePolicy policy = randomValueOtherThanMany(
p -> p.getPhases().size() == 0,
p -> p.getPhases().isEmpty(),
() -> LifecyclePolicyTests.randomTestLifecyclePolicy(policyName)
);
Phase nextPhase = policy.getPhases()
Expand Down Expand Up @@ -1436,6 +1436,6 @@ private void assertClusterStateStepInfo(
assertEquals(expectedstepInfoValue, newLifecycleState.stepInfo());
assertEquals(oldLifecycleState.phaseTime(), newLifecycleState.phaseTime());
assertEquals(oldLifecycleState.actionTime(), newLifecycleState.actionTime());
assertEquals(newLifecycleState.stepTime(), newLifecycleState.stepTime());
assertEquals(oldLifecycleState.stepTime(), newLifecycleState.stepTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void getRunningTaskFromNode(String persistentTaskId, ActionListener<Response> li
listener.onFailure(
new ResourceNotFoundException(
Strings.format(
"Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node",
"Persistent task [%s] is supposed to be running on node [%s], but the task is not found on that node",
persistentTaskId,
clusterService.localNode().getId()
)
Expand All @@ -106,7 +106,7 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String
listener.onFailure(
new ResourceNotFoundException(
Strings.format(
"Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster",
"Persistent task [%s] is supposed to be running on node [%s], but that node is not part of the cluster",
request.getIndex(),
nodeId
)
Expand Down

0 comments on commit 3a6071c

Please sign in to comment.