Skip to content

Commit

Permalink
Use SegmentReplicationTarget test to validate non-active on-disk file…
Browse files Browse the repository at this point in the history
…s are reused for replication (opensearch-project#11786)

* Use SegmentReplicationTarget test to validate non-active on-disk files are reused for replication

Signed-off-by: Suraj Singh <[email protected]>

* Fix original shard level unit test

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored and rayshrey committed Mar 18, 2024
1 parent 978ab80 commit 918b71d
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -371,7 +372,6 @@ public void testPrimaryRestart() throws Exception {
* prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
* committing locally so that in next round of segment replication those files are not considered for download again
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10885")
public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
Expand All @@ -388,6 +388,7 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
);
CountDownLatch latch = new CountDownLatch(1);

logger.info("--> Starting first round of replication");
// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
Expand Down Expand Up @@ -427,6 +428,7 @@ public void onReplicationFailure(
// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
logger.info("--> Starting second round of replication");
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
Expand Down Expand Up @@ -560,8 +562,19 @@ public void getSegmentFiles(
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
postGetFilesRunnable.run();
StepListener<GetSegmentFilesResponse> waitForCopyFilesListener = new StepListener();
super.getSegmentFiles(
replicationId,
checkpoint,
filesToFetch,
indexShard,
(fileName, bytesRecovered) -> {},
waitForCopyFilesListener
);
waitForCopyFilesListener.whenComplete(response -> {
postGetFilesRunnable.run();
listener.onResponse(response);
}, listener::onFailure);
}

@Override
Expand Down

0 comments on commit 918b71d

Please sign in to comment.