Skip to content

Commit

Permalink
Fixing bug and Adding UT
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Jan 5, 2024
1 parent 856e4e9 commit f5d33c4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,8 @@ public IndexResult index(Index index) throws IOException {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()))
;
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
}
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2041,7 +2041,13 @@ boolean isRemoteSegmentStoreInSync() {
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(() -> new ParameterizedMessage("RemoteSegmentStoreSyncStatus localSize={} remoteSize={}", localFiles.size(), uploadFiles.size()));
logger.debug(
() -> new ParameterizedMessage(

Check warning on line 2045 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2044-L2045

Added lines #L2044 - L2045 were not covered by tests
"RemoteSegmentStoreSyncStatus localSize={} remoteSize={}",
localFiles.size(),
uploadFiles.size()

Check warning on line 2048 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2047-L2048

Added lines #L2047 - L2048 were not covered by tests
)
);
}
}
} catch (Throwable e) {

Check warning on line 2053 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2053

Added line #L2053 was not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync();
|| isRemoteSegmentStoreInSync() == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
Expand All @@ -187,7 +187,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
*
* @return true iff all the local files are uploaded to remote store.
*/
boolean isRemoteSegmentStoreInSync() {
boolean isRemoteSegmentStoreInSync() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload);
} catch (Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.junit.After;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -38,6 +37,7 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -46,17 +46,17 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
Expand Down Expand Up @@ -227,7 +227,6 @@ public void testRefreshAfterCommit() throws IOException {

verifyUploadedSegments(remoteSegmentStoreDirectory);


// This is to check if reading data from remote segment store works as well.
remoteSegmentStoreDirectory.init();

Expand Down Expand Up @@ -407,36 +406,16 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception {
}

public void testRefreshPersistentFailure() throws Exception {
// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
// int succeedOnAttempt = 5;
// // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
// CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// // Value has been set as INT_MAX as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
// CountDownLatch successLatch = new CountDownLatch(Integer.MAX_VALUE);
// Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
// succeedOnAttempt,
// refreshCountLatch,
// successLatch
// );
// assertBusy(() -> assertEquals(Integer.MAX_VALUE - 2, successLatch.getCount()));
// assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());

// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
int succeedOnAttempt = 10;
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(1);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(10);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch
);
assertBusy(() -> assertTrue(10 > successLatch.getCount()));
// Giving 10ms for some iterations of remote refresh upload
Thread.sleep(10);
assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());
}

Expand Down Expand Up @@ -548,6 +527,7 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
// Mock indexShard.getSegmentInfosSnapshot()
doAnswer(invocation -> {
if (counter.incrementAndGet() <= succeedOnAttempt) {
logger.error("Failing in get segment info {}", counter.get());
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getSegmentInfosSnapshot();
Expand Down Expand Up @@ -624,6 +604,31 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto
}
}
}
assertTrue(remoteStoreRefreshListener.isRemoteSegmentStoreInSync());
}

public void testRemoteSegmentStoreNotInSync() throws IOException {
setup(true, 3);
remoteStoreRefreshListener.afterRefresh(true);
try (Store remoteStore = indexShard.remoteStore()) {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
verifyUploadedSegments(remoteSegmentStoreDirectory);
remoteStoreRefreshListener.isRemoteSegmentStoreInSync();
boolean oneFileDeleted = false;
// Delete any one file from remote store
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
for (String file : segmentInfos.files(true)) {
if (oneFileDeleted == false && RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file) == false) {
remoteSegmentStoreDirectory.deleteFile(file);
oneFileDeleted = true;
break;
}
}
}
assertFalse(remoteStoreRefreshListener.isRemoteSegmentStoreInSync());
}
}

}

0 comments on commit f5d33c4

Please sign in to comment.