Skip to content

Commit

Permalink
Wire waiting step into searchable_snapshot action
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Dec 16, 2024
1 parent 85ad2c5 commit 8216ecb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.LicenseUtils;
Expand Down Expand Up @@ -171,6 +172,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);
StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
StepKey replicateForKey = new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME);
StepKey dropReplicasKey = new StepKey(phase, NAME, UpdateSettingsStep.NAME);

// Before going through all these steps, first check if we need to do them at all. For example, the index could already be
// a searchable snapshot of the same type and repository, in which case we don't need to do anything. If that is detected,
Expand Down Expand Up @@ -345,19 +348,20 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
getRestoredIndexPrefix(mountSnapshotKey),
storageType,
totalShardsPerNode,
0
replicateFor != null ? 1 : 0 // if the 'replicate_for' option is set, then have a replica, otherwise don't
);
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(
waitForGreenRestoredIndexKey,
copyMetadataKey,
ClusterHealthStatus.GREEN,
getRestoredIndexPrefix(waitForGreenRestoredIndexKey)
);
StepKey keyForReplicateForOrContinue = replicateFor != null ? replicateForKey : nextStepKey;
CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep(
copyMetadataKey,
copyLifecyclePolicySettingKey,
(index, executionState) -> getRestoredIndexPrefix(copyMetadataKey) + index,
nextStepKey
keyForReplicateForOrContinue
);
CopySettingsStep copySettingsStep = new CopySettingsStep(
copyLifecyclePolicySettingKey,
Expand Down Expand Up @@ -390,6 +394,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
getRestoredIndexPrefix(swapAliasesKey)
);

Step replicateForStep = new WaitUntilReplicateForTimePassesStep(replicateForKey, dropReplicasKey, replicateFor);
UpdateSettingsStep dropRelicasStep = new UpdateSettingsStep(
dropReplicasKey,
nextStepKey,
client,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

List<Step> steps = new ArrayList<>();
steps.add(conditionalSkipActionStep);
steps.add(checkNoWriteIndexStep);
Expand All @@ -408,6 +420,10 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
steps.add(waitForGreenIndexHealthStep);
steps.add(copyMetadataStep);
steps.add(copySettingsStep);
if (replicateFor != null) {
steps.add(replicateForStep);
steps.add(dropRelicasStep);
}
steps.add(isDataStreamBranchingStep);
steps.add(replaceDataStreamBackingIndex);
steps.add(deleteSourceIndexStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void testToSteps() {

List<Step> steps = action.toSteps(null, phase, nextStepKey, null);

List<StepKey> expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex());
List<StepKey> expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex(), action.getReplicateFor() != null);

assertThat(steps.size(), is(expectedSteps.size()));
for (int i = 0; i < expectedSteps.size(); i++) {
assertThat("steps match expectation at index " + i, steps.get(i).getKey(), is(expectedSteps.get(i)));
Expand Down Expand Up @@ -94,7 +95,7 @@ public void testCreateWithInvalidTotalShardsPerNode() {
assertEquals("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1", exception.getMessage());
}

private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex) {
private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex, boolean hasReplicateFor) {
return Stream.of(
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP),
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
Expand All @@ -111,6 +112,8 @@ private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex) {
new StepKey(phase, NAME, WaitForIndexColorStep.NAME),
new StepKey(phase, NAME, CopyExecutionStateStep.NAME),
new StepKey(phase, NAME, CopySettingsStep.NAME),
hasReplicateFor ? new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME) : null,
hasReplicateFor ? new StepKey(phase, NAME, UpdateSettingsStep.NAME) : null,
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY),
new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME),
new StepKey(phase, NAME, DeleteStep.NAME),
Expand Down

0 comments on commit 8216ecb

Please sign in to comment.