Skip to content

Commit

Permalink
Add replica handling to the ILM MountSnapshotStep (elastic#118687)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Dec 13, 2024
1 parent db0d6ce commit a58ac99
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -20,13 +21,15 @@
*/
public abstract class AsyncWaitStep extends Step {

@Nullable
private final Client client;

public AsyncWaitStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey);
this.client = client;
}

@Nullable
protected Client getClient() {
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
cleanSnapshotKey,
Instant::now,
client
Instant::now
);
CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, deleteStepKey, client);
DeleteStep deleteStep = new DeleteStep(deleteStepKey, nextStepKey, client);
Expand All @@ -108,8 +107,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
deleteStepKey,
Instant::now,
client
Instant::now
);
DeleteStep deleteStep = new DeleteStep(deleteStepKey, nextStepKey, client);
return List.of(waitForNoFollowersStep, waitUntilTimeSeriesEndTimeStep, deleteStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
readOnlyKey,
Instant::now,
client
Instant::now
);
// Mark source index as read-only
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, generateDownsampleIndexNameKey, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
codecChange ? closeKey : forceMergeKey,
Instant::now,
client
Instant::now
);

// Indices already in this step key when upgrading need to know how to move forward but stop making the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
private final MountSearchableSnapshotRequest.Storage storageType;
@Nullable
private final Integer totalShardsPerNode;
private final int replicas;

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType,
@Nullable Integer totalShardsPerNode
@Nullable Integer totalShardsPerNode,
int replicas
) {
super(key, nextStepKey, client);
this.restoredIndexPrefix = restoredIndexPrefix;
Expand All @@ -57,16 +59,10 @@ public MountSnapshotStep(
throw new IllegalArgumentException("[" + SearchableSnapshotAction.TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1");
}
this.totalShardsPerNode = totalShardsPerNode;
}

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType
) {
this(key, nextStepKey, client, restoredIndexPrefix, storageType, null);
// this isn't directly settable by the user, so validation by assertion is sufficient
assert replicas >= 0 : "number of replicas must be gte zero, but was [" + replicas + "]";
this.replicas = replicas;
}

@Override
Expand All @@ -87,6 +83,10 @@ public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

public int getReplicas() {
return replicas;
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
String indexName = indexMetadata.getIndex().getName();
Expand Down Expand Up @@ -162,11 +162,13 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
}

final Settings.Builder settingsBuilder = Settings.builder();

overrideTierPreference(this.getKey().phase()).ifPresent(override -> settingsBuilder.put(DataTier.TIER_PREFERENCE, override));
if (totalShardsPerNode != null) {
settingsBuilder.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), totalShardsPerNode);
}
if (replicas > 0) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicas);
}

final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest(
TimeValue.MAX_VALUE,
Expand Down Expand Up @@ -245,7 +247,7 @@ String[] ignoredIndexSettings() {

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode);
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode, replicas);
}

@Override
Expand All @@ -260,6 +262,7 @@ public boolean equals(Object obj) {
return super.equals(obj)
&& Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix)
&& Objects.equals(storageType, other.storageType)
&& Objects.equals(totalShardsPerNode, other.totalShardsPerNode);
&& Objects.equals(totalShardsPerNode, other.totalShardsPerNode)
&& Objects.equals(replicas, other.replicas);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
readOnlyKey,
Instant::now,
client
Instant::now
);
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, nextStepKey, client);
return List.of(checkNotWriteIndexStep, waitUntilTimeSeriesEndTimeStep, readOnlyStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public String getSnapshotRepository() {
return snapshotRepository;
}

@Nullable
public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}
Expand Down Expand Up @@ -230,8 +231,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
skipGeneratingSnapshotKey,
Instant::now,
client
Instant::now
);

// When generating a snapshot, we either jump to the force merge step, or we skip the
Expand Down Expand Up @@ -318,7 +318,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
client,
getRestoredIndexPrefix(mountSnapshotKey),
storageType,
totalShardsPerNode
totalShardsPerNode,
0
);
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(
waitForGreenRestoredIndexKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
waitTimeSeriesEndTimePassesKey,
readOnlyKey,
Instant::now,
client
Instant::now
);
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, checkTargetShardsCountKey, client);
CheckTargetShardsCountStep checkTargetShardsCountStep = new CheckTargetShardsCountStep(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Represents the lifecycle of an index from creation to deletion. A
Expand All @@ -49,7 +48,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
static final String DELETE_PHASE = "delete";
public static final List<String> ORDERED_VALID_PHASES = List.of(HOT_PHASE, WARM_PHASE, COLD_PHASE, FROZEN_PHASE, DELETE_PHASE);

public static final List<String> ORDERED_VALID_HOT_ACTIONS = Stream.of(
public static final List<String> ORDERED_VALID_HOT_ACTIONS = List.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
RolloverAction.NAME,
Expand All @@ -58,8 +57,8 @@ public class TimeseriesLifecycleType implements LifecycleType {
ShrinkAction.NAME,
ForceMergeAction.NAME,
SearchableSnapshotAction.NAME
).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_WARM_ACTIONS = Stream.of(
);
public static final List<String> ORDERED_VALID_WARM_ACTIONS = List.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
ReadOnlyAction.NAME,
Expand All @@ -68,8 +67,8 @@ public class TimeseriesLifecycleType implements LifecycleType {
MigrateAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME
).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_COLD_ACTIONS = Stream.of(
);
public static final List<String> ORDERED_VALID_COLD_ACTIONS = List.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
ReadOnlyAction.NAME,
Expand All @@ -78,7 +77,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
AllocateAction.NAME,
MigrateAction.NAME,
FreezeAction.NAME
).filter(Objects::nonNull).toList();
);
public static final List<String> ORDERED_VALID_FROZEN_ACTIONS = List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME);
public static final List<String> ORDERED_VALID_DELETE_ACTIONS = List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
Expand All @@ -33,8 +32,8 @@ public class WaitUntilTimeSeriesEndTimePassesStep extends AsyncWaitStep {
public static final String NAME = "check-ts-end-time-passed";
private final Supplier<Instant> nowSupplier;

public WaitUntilTimeSeriesEndTimePassesStep(StepKey key, StepKey nextStepKey, Supplier<Instant> nowSupplier, Client client) {
super(key, nextStepKey, client);
public WaitUntilTimeSeriesEndTimePassesStep(StepKey key, StepKey nextStepKey, Supplier<Instant> nowSupplier) {
super(key, nextStepKey, null);
this.nowSupplier = nowSupplier;
}

Expand Down
Loading

0 comments on commit a58ac99

Please sign in to comment.