Skip to content

Commit

Permalink
Add number of replicas to the mount-snapshot step
Browse files Browse the repository at this point in the history
note that for now the value is always just 0
  • Loading branch information
joegallo committed Dec 13, 2024
1 parent c6b2d36 commit c6e1951
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
private final MountSearchableSnapshotRequest.Storage storageType;
@Nullable
private final Integer totalShardsPerNode;
private final int replicas;

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType,
@Nullable Integer totalShardsPerNode
@Nullable Integer totalShardsPerNode,
int replicas
) {
super(key, nextStepKey, client);
this.restoredIndexPrefix = restoredIndexPrefix;
Expand All @@ -57,6 +59,10 @@ public MountSnapshotStep(
throw new IllegalArgumentException("[" + SearchableSnapshotAction.TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1");
}
this.totalShardsPerNode = totalShardsPerNode;

// this isn't directly settable by the user, so validation by assertion is sufficient
assert replicas >= 0 : "number of replicas must be gte zero, but was [" + replicas + "]";
this.replicas = replicas;
}

@Override
Expand All @@ -77,6 +83,10 @@ public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

public int getReplicas() {
return replicas;
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
String indexName = indexMetadata.getIndex().getName();
Expand Down Expand Up @@ -152,11 +162,13 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
}

final Settings.Builder settingsBuilder = Settings.builder();

overrideTierPreference(this.getKey().phase()).ifPresent(override -> settingsBuilder.put(DataTier.TIER_PREFERENCE, override));
if (totalShardsPerNode != null) {
settingsBuilder.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), totalShardsPerNode);
}
if (replicas > 0) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicas);
}

final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest(
TimeValue.MAX_VALUE,
Expand Down Expand Up @@ -235,7 +247,7 @@ String[] ignoredIndexSettings() {

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode);
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode, replicas);
}

@Override
Expand All @@ -250,6 +262,7 @@ public boolean equals(Object obj) {
return super.equals(obj)
&& Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix)
&& Objects.equals(storageType, other.storageType)
&& Objects.equals(totalShardsPerNode, other.totalShardsPerNode);
&& Objects.equals(totalShardsPerNode, other.totalShardsPerNode)
&& Objects.equals(replicas, other.replicas);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
client,
getRestoredIndexPrefix(mountSnapshotKey),
storageType,
totalShardsPerNode
totalShardsPerNode,
0
);
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(
waitForGreenRestoredIndexKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MountSnapshotStep createRandomInstance() {
String restoredIndexPrefix = randomAlphaOfLength(10);
MountSearchableSnapshotRequest.Storage storage = randomStorageType();
Integer totalShardsPerNode = randomTotalShardsPerNode(true);
return new MountSnapshotStep(stepKey, nextStepKey, client, restoredIndexPrefix, storage, totalShardsPerNode);
return new MountSnapshotStep(stepKey, nextStepKey, client, restoredIndexPrefix, storage, totalShardsPerNode, 0);
}

public static MountSearchableSnapshotRequest.Storage randomStorageType() {
Expand All @@ -62,7 +62,8 @@ protected MountSnapshotStep copyInstance(MountSnapshotStep instance) {
instance.getClient(),
instance.getRestoredIndexPrefix(),
instance.getStorage(),
instance.getTotalShardsPerNode()
instance.getTotalShardsPerNode(),
instance.getReplicas()
);
}

Expand All @@ -73,7 +74,8 @@ public MountSnapshotStep mutateInstance(MountSnapshotStep instance) {
String restoredIndexPrefix = instance.getRestoredIndexPrefix();
MountSearchableSnapshotRequest.Storage storage = instance.getStorage();
Integer totalShardsPerNode = instance.getTotalShardsPerNode();
switch (between(0, 4)) {
int replicas = instance.getReplicas();
switch (between(0, 5)) {
case 0:
key = new StepKey(key.phase(), key.action(), key.name() + randomAlphaOfLength(5));
break;
Expand All @@ -95,10 +97,13 @@ public MountSnapshotStep mutateInstance(MountSnapshotStep instance) {
case 4:
totalShardsPerNode = totalShardsPerNode == null ? 1 : totalShardsPerNode + randomIntBetween(1, 100);
break;
case 5:
replicas = replicas == 0 ? 1 : 0; // swap between 0 and 1
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new MountSnapshotStep(key, nextKey, instance.getClient(), restoredIndexPrefix, storage, totalShardsPerNode);
return new MountSnapshotStep(key, nextKey, instance.getClient(), restoredIndexPrefix, storage, totalShardsPerNode, replicas);
}

public void testCreateWithInvalidTotalShardsPerNode() throws Exception {
Expand All @@ -112,7 +117,8 @@ public void testCreateWithInvalidTotalShardsPerNode() throws Exception {
client,
RESTORED_INDEX_PREFIX,
randomStorageType(),
invalidTotalShardsPerNode
invalidTotalShardsPerNode,
0
)
);
assertEquals("[total_shards_per_node] must be >= 1", exception.getMessage());
Expand Down Expand Up @@ -197,15 +203,17 @@ public void testPerformAction() throws Exception {
RESTORED_INDEX_PREFIX,
indexName,
new String[] { LifecycleSettings.LIFECYCLE_NAME },
null
null,
0
);
MountSnapshotStep step = new MountSnapshotStep(
randomStepKey(),
randomStepKey(),
client,
RESTORED_INDEX_PREFIX,
randomStorageType(),
null
null,
0
);
performActionAndWait(step, indexMetadata, clusterState, null);
}
Expand Down Expand Up @@ -241,7 +249,8 @@ public void testResponseStatusHandling() throws Exception {
clientPropagatingOKResponse,
RESTORED_INDEX_PREFIX,
randomStorageType(),
null
null,
0
);
performActionAndWait(step, indexMetadata, clusterState, null);
}
Expand All @@ -257,7 +266,8 @@ public void testResponseStatusHandling() throws Exception {
clientPropagatingACCEPTEDResponse,
RESTORED_INDEX_PREFIX,
randomStorageType(),
null
null,
0
);
performActionAndWait(step, indexMetadata, clusterState, null);
}
Expand Down Expand Up @@ -324,15 +334,17 @@ private void doTestMountWithoutSnapshotIndexNameInState(String prefix) throws Ex
RESTORED_INDEX_PREFIX,
indexNameSnippet,
new String[] { LifecycleSettings.LIFECYCLE_NAME },
null
null,
0
);
MountSnapshotStep step = new MountSnapshotStep(
randomStepKey(),
randomStepKey(),
client,
RESTORED_INDEX_PREFIX,
randomStorageType(),
null
null,
0
);
performActionAndWait(step, indexMetadata, clusterState, null);
}
Expand Down Expand Up @@ -369,15 +381,17 @@ public void testIgnoreTotalShardsPerNodeInFrozenPhase() throws Exception {
new String[] {
LifecycleSettings.LIFECYCLE_NAME,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey() },
null
null,
0
);
MountSnapshotStep step = new MountSnapshotStep(
new StepKey(TimeseriesLifecycleType.FROZEN_PHASE, randomAlphaOfLength(10), randomAlphaOfLength(10)),
randomStepKey(),
client,
RESTORED_INDEX_PREFIX,
randomStorageType(),
null
null,
0
);
performActionAndWait(step, indexMetadata, clusterState, null);
}
Expand All @@ -404,6 +418,7 @@ public void testDoNotIgnoreTotalShardsPerNodeAndReplicasIfSet() throws Exception
.build();

final Integer totalShardsPerNode = randomTotalShardsPerNode(false);
final int replicas = randomIntBetween(1, 5);

try (var threadPool = createThreadPool()) {
final var client = getRestoreSnapshotRequestAssertingClient(
Expand All @@ -414,15 +429,17 @@ public void testDoNotIgnoreTotalShardsPerNodeAndReplicasIfSet() throws Exception
RESTORED_INDEX_PREFIX,
indexName,
new String[] { LifecycleSettings.LIFECYCLE_NAME },
totalShardsPerNode
totalShardsPerNode,
replicas
);
MountSnapshotStep step = new MountSnapshotStep(
new StepKey(TimeseriesLifecycleType.FROZEN_PHASE, randomAlphaOfLength(10), randomAlphaOfLength(10)),
randomStepKey(),
client,
RESTORED_INDEX_PREFIX,
randomStorageType(),
totalShardsPerNode
totalShardsPerNode,
replicas
);
performActionAndWait(step, indexMetadata, clusterState, null);
}
Expand Down Expand Up @@ -451,7 +468,8 @@ private NoOpClient getRestoreSnapshotRequestAssertingClient(
String restoredIndexPrefix,
String expectedSnapshotIndexName,
String[] expectedIgnoredIndexSettings,
@Nullable Integer totalShardsPerNode
@Nullable Integer totalShardsPerNode,
int replicas
) {
return new NoOpClient(threadPool) {
@Override
Expand Down Expand Up @@ -487,6 +505,18 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
);
}

if (replicas > 0) {
Integer numberOfReplicasSettingValue = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(
mountSearchableSnapshotRequest.indexSettings()
);
assertThat(numberOfReplicasSettingValue, is(replicas));
} else {
assertThat(
mountSearchableSnapshotRequest.indexSettings().hasValue(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()),
is(false)
);
}

// invoke the awaiting listener with a very generic 'response', just to fulfill the contract
listener.onResponse((Response) new RestoreSnapshotResponse((RestoreInfo) null));
}
Expand Down

0 comments on commit c6e1951

Please sign in to comment.