Skip to content

Commit

Permalink
Fix failing test & incorporate comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 19, 2023
1 parent 4803283 commit 44666b7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends Abs

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ protected Settings nodeSettings(int nodeOriginal) {

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public void testGetDefaultSettings() {
assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check bytes lag variance threshold default value
assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d);
assertEquals(10.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d);

// Check time lag variance threshold default value
assertEquals(2.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d);
assertEquals(10.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d);

// Check minimum consecutive failures limit default value
assertEquals(5, pressureSettings.getMinConsecutiveFailuresLimit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -42,6 +43,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
Expand Down Expand Up @@ -237,9 +239,19 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception {
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(3);
mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch);
Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch
);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
RemoteRefreshSegmentPressureService pressureService = tuple.v2();
RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
assertEquals(0, segmentTracker.getBytesLag());
assertEquals(0, segmentTracker.getRefreshSeqNoLag());
assertEquals(0, segmentTracker.getTimeMsLag());
assertEquals(0, segmentTracker.getTotalUploadsFailed());
}

public void testRefreshSuccessOnSecondAttempt() throws Exception {
Expand All @@ -251,9 +263,19 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(3);
mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch);
Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch
);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
RemoteRefreshSegmentPressureService pressureService = tuple.v2();
RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
assertEquals(0, segmentTracker.getBytesLag());
assertEquals(0, segmentTracker.getRefreshSeqNoLag());
assertEquals(0, segmentTracker.getTimeMsLag());
assertEquals(1, segmentTracker.getTotalUploadsFailed());
}

public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception {
Expand All @@ -265,9 +287,20 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception {
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(3);
mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch);
Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch
);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
RemoteRefreshSegmentPressureService pressureService = tuple.v2();
RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId());
assertEquals(0, segmentTracker.getBytesLag());
assertEquals(0, segmentTracker.getRefreshSeqNoLag());
assertEquals(0, segmentTracker.getTimeMsLag());
assertEquals(2, segmentTracker.getTotalUploadsFailed());

}

public void testTrackerData() throws Exception {
Expand Down Expand Up @@ -310,7 +343,10 @@ private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> m
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build(),
new InternalEngineFactory()
);

Expand Down

0 comments on commit 44666b7

Please sign in to comment.