Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot Interop] Add Changes in Restore Snapshot Flow for remote store Interoperability #6788

Merged
merged 11 commits into from
Jun 27, 2023
Prev Previous commit
Next Next commit
Address PR Comments
Signed-off-by: Harish Bhakuni <[email protected]>
Harish Bhakuni committed Jun 27, 2023
commit 7997570d136299b58423c2a53d105f68a2ca4708
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
package org.opensearch.snapshots;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
@@ -41,14 +42,14 @@
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
import org.opensearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
@@ -69,7 +70,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -173,8 +173,18 @@ public void testParallelRestoreOperations() {
assertThat(client.prepareGet(restoredIndexName2, docId2).get().isExists(), equalTo(true));
}

public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException, ExecutionException, InterruptedException {
internalCluster().startNodes(3);
public void testRestoreRemoteStoreIndicesWithRemoteTranslog() throws IOException, ExecutionException, InterruptedException {
testRestoreOperationsShallowCopyEnabled(true);
}

public void testRestoreRemoteStoreIndicesWithoutRemoteTranslog() throws IOException, ExecutionException, InterruptedException {
testRestoreOperationsShallowCopyEnabled(false);
}

public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnabled) throws IOException, ExecutionException,
InterruptedException {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-restore-snapshot-repo";
@@ -194,7 +204,7 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
createRepository(remoteStoreRepoName, "fs", absolutePath2);

Client client = client();
Settings indexSettings = getIndexSettings(true, randomBoolean(), remoteStoreRepoName, 1, 0).build();
Settings indexSettings = getIndexSettings(true, remoteTranslogEnabled, remoteStoreRepoName, 1, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(false, false, null, 1, 0).build();
@@ -206,6 +216,7 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

final String secondNode = internalCluster().startNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
@@ -234,6 +245,11 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + randomIntBetween(2, 5));
ensureGreen(indexName1);

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
@@ -257,7 +273,8 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2);

// deleting data for restoredIndexName1 and restoring from remote store.
stopNodeWithPrimaryShard(restoredIndexName1);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1)));
ensureRed(restoredIndexName1);
assertAcked(client().admin().indices().prepareClose(restoredIndexName1));
client().admin()
.cluster()
@@ -329,8 +346,111 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2);
}

public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoName = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX;
String snapshotName1 = "test-restore-snapshot1";
String snapshotName2 = "test-restore-snapshot2";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
Path absolutePath2 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);
logger.info("Remote Store Repo Path [{}]", absolutePath2);
String restoredIndexName2 = indexName2 + "-restored";

boolean enableShallowCopy = randomBoolean();
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, enableShallowCopy));
createRepository(remoteStoreRepoName, "fs", absolutePath2);

Client client = client();
Settings indexSettings = getIndexSettings(true, randomBoolean(), remoteStoreRepoName, 1, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(false, false, null, 1, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 5;
final int numDocsInIndex2 = 6;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

final String secondNode = internalCluster().startNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + randomIntBetween(2, 5));
ensureGreen(indexName1);

assertAcked(client().admin().indices().prepareClose(indexName1));

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIndices(indexName1)
.get();
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(false)
.setIndices(indexName2)
.setRenamePattern(indexName2)
.setRenameReplacement(restoredIndexName2)
.get();
assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED);
assertEquals(restoreSnapshotResponse2.status(), RestStatus.ACCEPTED);
ensureGreen(indexName1, restoredIndexName2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1);
assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2);

// deleting data for restoredIndexName1 and restoring from remote store.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1)));
ensureRed(indexName1);
assertAcked(client().admin().indices().prepareClose(indexName1));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture());
ensureYellowAndNoInitializingShards(indexName1);
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
internalCluster().startNodes(3);
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-restore-snapshot-repo";
@@ -342,7 +462,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
Path absolutePath3 = randomRepoPath().toAbsolutePath();
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
createRepository(remoteStoreRepoName, "fs", absolutePath2);
createRepository(remoteStoreRepo2Name, "fs", absolutePath3);

@@ -359,6 +479,8 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

final String secondNode = internalCluster().startNode();

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
@@ -391,7 +513,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);

// deleting data for restoredIndexName1 and restoring from remote store.
stopNodeWithPrimaryShard(restoredIndexName1);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1)));
assertAcked(client().admin().indices().prepareClose(restoredIndexName1));
client().admin()
.cluster()
@@ -431,7 +553,6 @@ private Settings.Builder getIndexSettings(
}

public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionException, InterruptedException {
internalCluster().startNodes(3);
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoName = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX;
@@ -524,27 +645,6 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

private void stopNodeWithPrimaryShard(String indexName) throws IOException {
String primaryNodeName = primaryNodeName(indexName);
String clusterManagerNodeName = internalCluster().getClusterManagerName();
if (!Objects.equals(primaryNodeName, clusterManagerNodeName)) {
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
} else {
// if the node on which shard assigned is cluster manager node,
// we reroute the shard to other node and then stop node.
String nodeOtherThanClusterManager = Arrays.stream(internalCluster().getNodeNames())
.filter(nodeName -> !Objects.equals(nodeName, clusterManagerNodeName))
.findFirst()
.get();
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("index1", 0, primaryNodeName, nodeOtherThanClusterManager))
.get();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName)));
}
}

private void indexDocuments(Client client, String indexName, int numOfDocs) {
indexDocuments(client, indexName, 0, numOfDocs);
}
Loading