From 8216ecbad0ace60a03e18dcd5ac2d126ed891539 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 16 Dec 2024 15:14:45 -0500 Subject: [PATCH] Wire waiting step into searchable_snapshot action --- .../core/ilm/SearchableSnapshotAction.java | 20 +++++++++++++++++-- .../ilm/SearchableSnapshotActionTests.java | 7 +++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index 8b6fedee4552f..25ece84f75e23 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.license.LicenseUtils; @@ -171,6 +172,8 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME); StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME); StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); + StepKey replicateForKey = new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME); + StepKey dropReplicasKey = new StepKey(phase, NAME, UpdateSettingsStep.NAME); // Before going through all these steps, first check if we need to do them at all. For example, the index could already be // a searchable snapshot of the same type and repository, in which case we don't need to do anything. If that is detected, @@ -345,7 +348,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac getRestoredIndexPrefix(mountSnapshotKey), storageType, totalShardsPerNode, - 0 + replicateFor != null ? 1 : 0 // if the 'replicate_for' option is set, then have a replica, otherwise don't ); WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep( waitForGreenRestoredIndexKey, @@ -353,11 +356,12 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ClusterHealthStatus.GREEN, getRestoredIndexPrefix(waitForGreenRestoredIndexKey) ); + StepKey keyForReplicateForOrContinue = replicateFor != null ? replicateForKey : nextStepKey; CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep( copyMetadataKey, copyLifecyclePolicySettingKey, (index, executionState) -> getRestoredIndexPrefix(copyMetadataKey) + index, - nextStepKey + keyForReplicateForOrContinue ); CopySettingsStep copySettingsStep = new CopySettingsStep( copyLifecyclePolicySettingKey, @@ -390,6 +394,14 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac getRestoredIndexPrefix(swapAliasesKey) ); + Step replicateForStep = new WaitUntilReplicateForTimePassesStep(replicateForKey, dropReplicasKey, replicateFor); + UpdateSettingsStep dropRelicasStep = new UpdateSettingsStep( + dropReplicasKey, + nextStepKey, + client, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + List steps = new ArrayList<>(); steps.add(conditionalSkipActionStep); steps.add(checkNoWriteIndexStep); @@ -408,6 +420,10 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac steps.add(waitForGreenIndexHealthStep); steps.add(copyMetadataStep); steps.add(copySettingsStep); + if (replicateFor != null) { + steps.add(replicateForStep); + steps.add(dropRelicasStep); + } steps.add(isDataStreamBranchingStep); steps.add(replaceDataStreamBackingIndex); steps.add(deleteSourceIndexStep); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java index c681e3e93c26f..c6119b272a8c0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java @@ -33,7 +33,8 @@ public void testToSteps() { List steps = action.toSteps(null, phase, nextStepKey, null); - List expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex()); + List expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex(), action.getReplicateFor() != null); + assertThat(steps.size(), is(expectedSteps.size())); for (int i = 0; i < expectedSteps.size(); i++) { assertThat("steps match expectation at index " + i, steps.get(i).getKey(), is(expectedSteps.get(i))); @@ -94,7 +95,7 @@ public void testCreateWithInvalidTotalShardsPerNode() { assertEquals("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1", exception.getMessage()); } - private List expectedStepKeys(String phase, boolean forceMergeIndex) { + private List expectedStepKeys(String phase, boolean forceMergeIndex, boolean hasReplicateFor) { return Stream.of( new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP), new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME), @@ -111,6 +112,8 @@ private List expectedStepKeys(String phase, boolean forceMergeIndex) { new StepKey(phase, NAME, WaitForIndexColorStep.NAME), new StepKey(phase, NAME, CopyExecutionStateStep.NAME), new StepKey(phase, NAME, CopySettingsStep.NAME), + hasReplicateFor ? new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME) : null, + hasReplicateFor ? new StepKey(phase, NAME, UpdateSettingsStep.NAME) : null, new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY), new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME), new StepKey(phase, NAME, DeleteStep.NAME),