Skip to content

Commit

Permalink
[Snapshot Interop] Fix Flakiness in Snapshot Interop Code
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Sep 7, 2023
1 parent 76f1b52 commit 3f006ae
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
Expand All @@ -26,6 +25,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -34,7 +34,9 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
Expand Down Expand Up @@ -129,32 +131,21 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1, indexName2)));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo2 = createSnapshot(
snapshotRepoName,
snapshotName2,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
Expand Down Expand Up @@ -258,7 +249,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9326")
public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -289,32 +279,24 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo2 = createSnapshot(
snapshotRepoName,
snapshotName2,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
Expand Down Expand Up @@ -342,6 +324,10 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
ensureGreen(indexName1, restoredIndexName2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1);
assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);

// deleting data for restoredIndexName1 and restoring from remote store.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
Expand All @@ -356,9 +342,9 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
indexDocuments(client, indexName1, numDocsInIndex1 + 2, numDocsInIndex1 + 4);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
Expand Down Expand Up @@ -392,18 +378,14 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
internalCluster().startDataOnlyNode();

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

Settings remoteStoreIndexSettings = Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStoreRepo2Name)
Expand Down Expand Up @@ -479,18 +461,10 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce
ensureGreen(indexName1);

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

createRepository(BASE_REMOTE_REPO, "fs", absolutePath2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.repositories.blobstore;

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexModule;
Expand All @@ -29,16 +31,21 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class BlobStoreRepositoryHelperTests extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -122,13 +129,28 @@ protected Settings getRemoteStoreBackedIndexSettings() {
.build();
}

protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, List<String> indices) {
logger.info("--> creating snapshot [{}] of {} in [{}]", snapshot, indices, repositoryName);

final CreateSnapshotResponse response = client().admin()
.cluster()
.prepareCreateSnapshot(repositoryName, snapshot)
.setIndices(indices.toArray(Strings.EMPTY_ARRAY))
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = response.getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.failedShards(), equalTo(0));
return snapshotInfo;
}

protected void indexDocuments(Client client, String indexName) {
int numDocs = randomIntBetween(10, 20);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get();
}
client.admin().indices().prepareFlush(indexName).get();
}

protected IndexSettings getIndexSettings(String indexName) {
Expand Down
Loading

0 comments on commit 3f006ae

Please sign in to comment.