Skip to content

Commit

Permalink
Read the same medata file that is locked during restore of shallow sn…
Browse files Browse the repository at this point in the history
…apshot (opensearch-project#10979)

Signed-off-by: Sachin Kale <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
sachinpkale authored and shiv0408 committed Apr 25, 2024
1 parent 4f29d7e commit 1e78ed9
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,4 +589,71 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionException, InterruptedException {
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
Path absolutePath2 = randomRepoPath().toAbsolutePath();
String[] pathTokens = absolutePath1.toString().split("/");
String basePath = pathTokens[pathTokens.length - 1];
Arrays.copyOf(pathTokens, pathTokens.length - 1);
Path location = PathUtils.get(String.join("/", pathTokens));
pathTokens = absolutePath2.toString().split("/");
String basePath2 = pathTokens[pathTokens.length - 1];
Arrays.copyOf(pathTokens, pathTokens.length - 1);
Path location2 = PathUtils.get(String.join("/", pathTokens));
logger.info("Path 1 [{}]", absolutePath1);
logger.info("Path 2 [{}]", absolutePath2);
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
createIndex(indexName1, indexSettings);

int numDocsInIndex1 = randomIntBetween(2, 5);
indexDocuments(client, indexName1, numDocsInIndex1);

ensureGreen(indexName1);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

int extraNumDocsInIndex1 = randomIntBetween(20, 50);
indexDocuments(client, indexName1, extraNumDocsInIndex1);
refresh(indexName1);

client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated)
.get();

assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4908,8 +4908,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
remoteStore.incRef();
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.initializeToSpecificCommit(primaryTerm, commitGeneration)
.getMetadata();
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ void recoverFromSnapshotAndRemoteStore(
indexUUID,
shardId
);
sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
commitGeneration,
recoverySource.snapshot().getSnapshotId().getUUID()
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
final Store store = indexShard.store();
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -160,8 +161,9 @@ public RemoteSegmentMetadata init() throws IOException {
*
* @throws IOException if there were any failures in reading the metadata file
*/
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, commitGeneration);
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration, String acquirerId) throws IOException {
String metadataFilePrefix = MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, commitGeneration);
String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLock(metadataFilePrefix, acquirerId);
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to
Expand Down Expand Up @@ -70,6 +73,19 @@ public void release(LockInfo lockInfo) throws IOException {
}
}

public String fetchLock(String filenamePrefix, String acquirerId) throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix);
List<String> lockFilesForAcquirer = lockFiles.stream()
.filter(lockFile -> acquirerId.equals(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lockFile)))
.map(FileLockInfo.LockFileUtils::getFileToLockNameFromLock)
.collect(Collectors.toList());
if (lockFilesForAcquirer.size() == 0) {
throw new FileNotFoundException("No lock file found for prefix: " + filenamePrefix + " and acquirerId: " + acquirerId);
}
assert lockFilesForAcquirer.size() == 1;
return lockFilesForAcquirer.get(0);
}

/**
* Checks whether a given file have any lock on it or not.
* @param lockInfo File Lock Info instance for which we need to check if lock is acquired.
Expand Down

0 comments on commit 1e78ed9

Please sign in to comment.