Skip to content

Commit

Permalink
Unifying isRemoteSegmentStoreInSync in IndexShard
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Jan 23, 2024
1 parent e034a50 commit 2b7363f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 40 deletions.
41 changes: 24 additions & 17 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2028,32 +2028,39 @@ private RemoteSegmentStoreDirectory getRemoteDirectory() {
* is in sync with local
*/
boolean isRemoteSegmentStoreInSync() {
return isRemoteSegmentStoreInSync(true);
}

boolean isRemoteSegmentStoreInSync(boolean verifyMetadata) {
assert indexSettings.isRemoteStoreEnabled();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
Collection<String> uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
Collection<String> localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true);
Set<String> localFiles = new HashSet<>(localSegmentInfosFiles);
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES);
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(
() -> new ParameterizedMessage(
"RemoteSegmentStoreSyncStatus localSize={} remoteSize={}",
localFiles.size(),
uploadFiles.size()
)
);
if (verifyMetadata && directory.readLatestMetadataFile() == null) {
return false;
}

Collection<String> uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
Collection<String> localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true);
Set<String> localFiles = new HashSet<>(localSegmentInfosFiles);
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES);
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(
() -> new ParameterizedMessage(
"RemoteSegmentStoreSyncStatus localSize={} remoteSize={}",
localFiles.size(),
uploadFiles.size()
)
);
}
} catch (Throwable e) {
logger.error("Exception while reading latest metadata", e);
}
return false;

}

public void preRecovery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,30 +172,13 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync() == false;
|| indexShard.isRemoteSegmentStoreInSync(false) == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

/**
* Checks if all files present in local store are uploaded to remote store or part of excluded files.
*
* Different from IndexShard#isRemoteSegmentStoreInSync as it uses files uploaded cache in RemoteDirectory
* And it doesn't make a remote store call.
*
* @return true iff all the local files are uploaded to remote store.
*/
boolean isRemoteSegmentStoreInSync() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload);
} catch (Throwable throwable) {
logger.error("Throwable thrown during isRemoteSegmentStoreInSync", throwable);
}
return false;
}

/*
@return false if retry is needed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception {
RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2();
RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
assertNoLagAndTotalUploadsFailed(segmentTracker, 0);
assertTrue("remote store in sync", tuple.v1().isRemoteSegmentStoreInSync());
assertTrue("remote store in sync", indexShard.isRemoteSegmentStoreInSync());
}

public void testRefreshSuccessOnSecondAttempt() throws Exception {
Expand Down Expand Up @@ -416,7 +416,7 @@ public void testRefreshPersistentFailure() throws Exception {
);
// Giving 10ms for some iterations of remote refresh upload
Thread.sleep(10);
assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());
assertFalse("remote store should not in sync", indexShard.isRemoteSegmentStoreInSync(false));
}

private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception {
Expand Down Expand Up @@ -611,7 +611,7 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto
}
}
}
assertTrue(remoteStoreRefreshListener.isRemoteSegmentStoreInSync());
assertTrue(indexShard.isRemoteSegmentStoreInSync(false));
}

public void testRemoteSegmentStoreNotInSync() throws IOException {
Expand All @@ -621,7 +621,6 @@ public void testRemoteSegmentStoreNotInSync() throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
verifyUploadedSegments(remoteSegmentStoreDirectory);
remoteStoreRefreshListener.isRemoteSegmentStoreInSync();
boolean oneFileDeleted = false;
// Delete any one file from remote store
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Expand All @@ -634,7 +633,7 @@ public void testRemoteSegmentStoreNotInSync() throws IOException {
}
}
}
assertFalse(remoteStoreRefreshListener.isRemoteSegmentStoreInSync());
assertFalse(indexShard.isRemoteSegmentStoreInSync(false));
}
}

Expand Down

0 comments on commit 2b7363f

Please sign in to comment.