Skip to content

Commit

Permalink
[Snapshot Interop] Fix Flakiness in Snapshot Interop Code
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Sep 6, 2023
1 parent 9602d1d commit 8f7a75e
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
Expand All @@ -26,14 +25,17 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
Expand Down Expand Up @@ -128,32 +130,22 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1, indexName2)));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo2 = createSnapshot(
snapshotRepoName,
snapshotName2,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
Expand Down Expand Up @@ -257,7 +249,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9326")
public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -288,32 +279,24 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo2 = createSnapshot(
snapshotRepoName,
snapshotName2,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
Expand Down Expand Up @@ -341,6 +324,10 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
ensureGreen(indexName1, restoredIndexName2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1);
assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);

// deleting data for restoredIndexName1 and restoring from remote store.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
Expand All @@ -355,9 +342,9 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
indexDocuments(client, indexName1, numDocsInIndex1 + 2, numDocsInIndex1 + 4);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
Expand Down Expand Up @@ -391,18 +378,14 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
internalCluster().startDataOnlyNode();

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

Settings remoteStoreIndexSettings = Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStoreRepo2Name)
Expand Down Expand Up @@ -478,18 +461,10 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce
ensureGreen(indexName1);

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

createRepository(BASE_REMOTE_REPO, "fs", absolutePath2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.repositories.blobstore;

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexModule;
Expand All @@ -29,16 +31,21 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class BlobStoreRepositoryHelperTests extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -122,6 +129,35 @@ protected Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) {
.build();
}

protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, List<String> indices) {
logger.info("--> creating snapshot [{}] of {} in [{}]", snapshot, indices, repositoryName);
int num_retries = 0;
final String MD_FILE_NOT_FOUND_ISSUE_PREFIX = "Metadata file is not present for given primary term";
SnapshotInfo snapshotInfo = null;
while (num_retries++ < 3) {
final CreateSnapshotResponse response = client().admin()
.cluster()
.prepareCreateSnapshot(repositoryName, snapshot)
.setIndices(indices.toArray(Strings.EMPTY_ARRAY))
.setWaitForCompletion(true)
.get();
snapshotInfo = response.getSnapshotInfo();
// currently, if for some reason during snapshot creation, md for the snapshot commit is not
// uploaded to remote store, we fail the snapshot for the shard, so that user can retry the snapshot.
// if it happens during test runs, retrying it for 3 times, to fix the snapshot.
if (snapshotInfo.state() != SnapshotState.PARTIAL
&& !snapshotInfo.shardFailures().get(0).reason().contains(MD_FILE_NOT_FOUND_ISSUE_PREFIX)) {
break;
}
logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot).get();
}
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.failedShards(), equalTo(0));
return snapshotInfo;
}

protected void indexDocuments(Client client, String indexName) {
int numDocs = randomIntBetween(10, 20);
for (int i = 0; i < numDocs; i++) {
Expand Down
Loading

0 comments on commit 8f7a75e

Please sign in to comment.