From 3ac908297d0a4c0bc874c9ed7b84c9b6e5a26590 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 8 Aug 2023 15:18:32 -0700 Subject: [PATCH 1/7] [Segment Replication] Add cancellation support in RemoteStoreReplicationSource Signed-off-by: Suraj Singh Self review Signed-off-by: Suraj Singh Address review comments Signed-off-by: Suraj Singh Address review comments Signed-off-by: Suraj Singh --- .../SegmentReplicationSuiteIT.java | 41 +++- .../replication/SegmentReplicationTarget.java | 6 +- .../index/shard/RemoteIndexShardTests.java | 175 +++++++++++++++++- .../SegmentReplicationIndexShardTests.java | 57 ------ ...licationWithNodeToNodeIndexShardTests.java | 52 ++++++ .../replication/TestReplicationSource.java | 2 +- 6 files changed, 266 insertions(+), 67 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 800704eae7fa7..f02eaa6c2979d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -11,19 +11,51 @@ import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + private static final boolean remoteStoreEnabled = randomBoolean(); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + if (remoteStoreEnabled == true) { + builder = builder.put(remoteStoreClusterSettings(REPOSITORY_NAME)); + } + return builder.build(); + } + @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); + if (remoteStoreEnabled == true) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType("fs") + .setSettings(Settings.builder().put("location", randomRepoPath().toAbsolutePath())) + ); + } createIndex(INDEX_NAME); } + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, remoteStoreEnabled) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + @Override public Settings indexSettings() { final Settings.Builder builder = Settings.builder() @@ -33,12 +65,11 @@ public Settings indexSettings() { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); - // TODO: Randomly enable remote store on these tests. return builder.build(); } public void testBasicReplication() throws Exception { - final int docCount = scaledRandomIntBetween(10, 200); + final int docCount = scaledRandomIntBetween(10, 50); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } @@ -51,7 +82,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); internalCluster().startClusterManagerOnlyNodes(1); - final int docCount = scaledRandomIntBetween(10, 200); + final int docCount = scaledRandomIntBetween(10, 50); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } @@ -67,7 +98,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { public void testDeleteIndexWhileReplicating() throws Exception { internalCluster().startClusterManagerOnlyNode(); - final int docCount = scaledRandomIntBetween(10, 200); + final int docCount = scaledRandomIntBetween(10, 50); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } @@ -77,7 +108,7 @@ public void testDeleteIndexWhileReplicating() throws Exception { public void testFullRestartDuringReplication() throws Exception { internalCluster().startNode(); - final int docCount = scaledRandomIntBetween(10, 200); + final int docCount = scaledRandomIntBetween(10, 50); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7a5f9608dace0..079ee1c6d7aac 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -159,13 +159,15 @@ public void startReplication(ActionListener listener) { // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); - source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + cancellableThreads.execute(() -> source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener)); checkpointInfoListener.whenComplete(checkpointInfo -> { final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); - source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener); + cancellableThreads.execute( + () -> source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener) + ); }, listener::onFailure); getFilesListener.whenComplete(response -> { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 8e27c9ff9ae1a..7435b8ca73d5f 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -10,7 +10,14 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; +import org.hamcrest.MatcherAssert; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.junit.Assert; +import org.junit.Before; +import org.opensearch.core.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; @@ -18,11 +25,11 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.common.ReplicationType; -import org.hamcrest.MatcherAssert; -import org.junit.Before; import java.io.IOException; import java.nio.file.Path; @@ -30,12 +37,23 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests { @@ -79,6 +97,159 @@ public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true); } + public void testCloseShardWhileGettingCheckpoint() throws Exception { + String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }"; + try ( + ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir()) + ) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + + // Create custom replication source in order to trigger shard close operations at specific point of segment replication + // lifecycle + SegmentReplicationSource source = new TestReplicationSource() { + RemoteSegmentStoreDirectory remoteDirectory; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // shard is closing while fetching metadata + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); + + RemoteSegmentMetadata mdFile = null; + try { + mdFile = remoteDirectory.init(); + final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); + Map metadataMap = mdFile.getMetadata() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) + ) + ); + listener.onResponse( + new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + Assert.fail("Unreachable"); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, primary, targetService); + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + shards.indexDocs(10); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + RemoteSegmentStoreDirectory remoteDirectory; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + try { + FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); + + RemoteSegmentMetadata mdFile = remoteDirectory.init(); + final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); + Map metadataMap = mdFile.getMetadata() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) + ) + ); + listener.onResponse( + new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + try { + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + final Directory storeDirectory = indexShard.store().directory(); + for (StoreFileMetadata fileMetadata : filesToFetch) { + String file = fileMetadata.name(); + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, primary, targetService); + shards.removeReplica(replica); + closeShards(replica); + } + } + public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception { try ( ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir()) diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 807b4a9cd7482..7b16ebef2170c 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -26,7 +26,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexSettings; @@ -36,7 +35,6 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; -import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.SnapshotMatchers; @@ -44,8 +42,6 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; -import org.opensearch.indices.replication.GetSegmentFilesResponse; -import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTarget; @@ -84,7 +80,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { @@ -675,58 +670,6 @@ public void testCloseShardDuringFinalize() throws Exception { } } - public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - - ActionListener listener; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - resolveCheckpointInfoResponseListener(listener, primary); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - // set the listener, we will only fail it once we cancel the source. - this.listener = listener; - // shard is closing while we are copying files. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - } - - @Override - public void cancel() { - // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); - listener.onFailure(exception); - } - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, primary, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - protected SegmentReplicationTargetService newTargetService(SegmentReplicationSourceFactory sourceFactory) { return new SegmentReplicationTargetService( threadPool, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java index c394101697b47..353968db16e66 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -709,4 +709,56 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } } + public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + + ActionListener listener; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointInfoResponseListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + // set the listener, we will only fail it once we cancel the source. + this.listener = listener; + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + + @Override + public void cancel() { + // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + listener.onFailure(exception); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, primary, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java index b29e25a0bff2c..c75f105c0759c 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -41,6 +41,6 @@ public abstract void getSegmentFiles( @Override public String getDescription() { - return "This is a test description"; + return "TestReplicationSource"; } } From d89dfe3c413fc741bc0b9b167af60ffc6d0a3bd5 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 16 Aug 2023 11:15:39 -0700 Subject: [PATCH 2/7] Update SegmentReplicationSuiteIT Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationSuiteIT.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index f02eaa6c2979d..7e00ec8e6af22 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -16,6 +16,8 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; +import java.util.Random; + import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -23,7 +25,7 @@ public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { private static final String REPOSITORY_NAME = "test-remote-store-repo"; - private static final boolean remoteStoreEnabled = randomBoolean(); + private static final boolean remoteStoreEnabled = new Random().nextBoolean(); @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -68,6 +70,13 @@ public Settings indexSettings() { return builder.build(); } + @After + public void teardown() { + if (remoteStoreEnabled == true) { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + } + public void testBasicReplication() throws Exception { final int docCount = scaledRandomIntBetween(10, 50); for (int i = 0; i < docCount; i++) { From 6885598c6b66553e6850d97c2cdf5d768d9a20b8 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 16 Aug 2023 12:30:35 -0700 Subject: [PATCH 3/7] Fix access control exception due to static instance usage Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationSuiteIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 7e00ec8e6af22..2721edfc740fe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -16,8 +16,6 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; -import java.util.Random; - import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -25,7 +23,7 @@ public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { private static final String REPOSITORY_NAME = "test-remote-store-repo"; - private static final boolean remoteStoreEnabled = new Random().nextBoolean(); + private final boolean remoteStoreEnabled = randomBoolean(); @Override protected Settings nodeSettings(int nodeOrdinal) { From 09991704b5f37b7f2aa0d76abb9da3779d0d5f01 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 16 Aug 2023 14:38:12 -0700 Subject: [PATCH 4/7] Spotless fix post main merge Signed-off-by: Suraj Singh --- .../SegmentReplicationSuiteIT.java | 1 + .../index/shard/RemoteIndexShardTests.java | 28 +++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 2721edfc740fe..ff5b6f141087f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -14,6 +14,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; import org.junit.Before; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 7435b8ca73d5f..4e78748cc9845 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -10,26 +10,34 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.util.Version; -import org.hamcrest.MatcherAssert; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; -import org.junit.Assert; -import org.junit.Before; -import org.opensearch.core.action.ActionListener; +import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.TestReplicationSource; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Before; import java.io.IOException; import java.nio.file.Path; @@ -37,14 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.indices.replication.CheckpointInfoResponse; -import org.opensearch.indices.replication.GetSegmentFilesResponse; -import org.opensearch.indices.replication.SegmentReplicationSource; -import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; From 9b1574991bd3f0df922e054e8080b07365662342 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 16 Aug 2023 19:14:03 -0700 Subject: [PATCH 5/7] Add remote store enabled suite ITs Signed-off-by: Suraj Singh --- .../SegmentReplicationSuiteIT.java | 51 +------- ...mentReplicationWithRemoteStoreSuiteIT.java | 118 ++++++++++++++++++ 2 files changed, 124 insertions(+), 45 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index ff5b6f141087f..9025c1cc79884 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -8,55 +8,22 @@ package org.opensearch.indices.replication; +import org.junit.Before; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.After; -import org.junit.Before; - -import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { - private static final String REPOSITORY_NAME = "test-remote-store-repo"; - private final boolean remoteStoreEnabled = randomBoolean(); - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); - if (remoteStoreEnabled == true) { - builder = builder.put(remoteStoreClusterSettings(REPOSITORY_NAME)); - } - return builder.build(); - } - @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); - if (remoteStoreEnabled == true) { - assertAcked( - clusterAdmin().preparePutRepository(REPOSITORY_NAME) - .setType("fs") - .setSettings(Settings.builder().put("location", randomRepoPath().toAbsolutePath())) - ); - } createIndex(INDEX_NAME); } - @Override - protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.REMOTE_STORE, remoteStoreEnabled) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .build(); - } - @Override public Settings indexSettings() { final Settings.Builder builder = Settings.builder() @@ -66,18 +33,12 @@ public Settings indexSettings() { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + // TODO: Randomly enable remote store on these tests. return builder.build(); } - @After - public void teardown() { - if (remoteStoreEnabled == true) { - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); - } - } - public void testBasicReplication() throws Exception { - final int docCount = scaledRandomIntBetween(10, 50); + final int docCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } @@ -90,7 +51,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); internalCluster().startClusterManagerOnlyNodes(1); - final int docCount = scaledRandomIntBetween(10, 50); + final int docCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } @@ -106,7 +67,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { public void testDeleteIndexWhileReplicating() throws Exception { internalCluster().startClusterManagerOnlyNode(); - final int docCount = scaledRandomIntBetween(10, 50); + final int docCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } @@ -116,7 +77,7 @@ public void testDeleteIndexWhileReplicating() throws Exception { public void testFullRestartDuringReplication() throws Exception { internalCluster().startNode(); - final int docCount = scaledRandomIntBetween(10, 50); + final int docCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java new file mode 100644 index 0000000000000..f46589dc13d12 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) +public class SegmentReplicationWithRemoteStoreSuiteIT extends SegmentReplicationBaseIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(remoteStoreClusterSettings(REPOSITORY_NAME)).build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType("fs") + .setSettings(Settings.builder().put("location", randomRepoPath().toAbsolutePath())) + ); + createIndex(INDEX_NAME); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + @Override + public Settings indexSettings() { + final Settings.Builder builder = Settings.builder() + .put(super.indexSettings()) + // reset shard & replica count to random values set by OpenSearchIntegTestCase. + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + + return builder.build(); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + public void testBasicReplication() throws Exception { + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(); + ensureGreen(INDEX_NAME); + verifyStoreContent(); + } + + public void testDropRandomNodeDuringReplication() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + internalCluster().startClusterManagerOnlyNodes(1); + + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(); + + internalCluster().restartRandomDataNode(); + + ensureYellow(INDEX_NAME); + client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); + internalCluster().startDataOnlyNode(); + client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); + } + + public void testDeleteIndexWhileReplicating() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); + } + + public void testFullRestartDuringReplication() throws Exception { + internalCluster().startNode(); + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + internalCluster().fullRestart(); + ensureGreen(INDEX_NAME); + } +} From 79e72105d0773ae8693a64357454a7ae7efbf414 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 16 Aug 2023 19:22:06 -0700 Subject: [PATCH 6/7] Spotless fix Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationSuiteIT.java | 2 +- .../replication/SegmentReplicationWithRemoteStoreSuiteIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 9025c1cc79884..800704eae7fa7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -8,12 +8,12 @@ package org.opensearch.indices.replication; -import org.junit.Before; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java index f46589dc13d12..4319772033a65 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java @@ -27,7 +27,7 @@ public class SegmentReplicationWithRemoteStoreSuiteIT extends SegmentReplication @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(remoteStoreClusterSettings(REPOSITORY_NAME)).build(); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(remoteStoreClusterSettings(REPOSITORY_NAME)).build(); } @Before From 4fc4af6355927e43b97e710618945e5b56149185 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 21 Aug 2023 19:11:43 -0700 Subject: [PATCH 7/7] Address review comments Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationTarget.java | 4 - .../index/shard/RemoteIndexShardTests.java | 182 ++++++------------ 2 files changed, 56 insertions(+), 130 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 079ee1c6d7aac..829012a65b991 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -158,13 +158,11 @@ public void startReplication(ActionListener listener) { logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); - cancellableThreads.checkForCancel(); cancellableThreads.execute(() -> source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener)); checkpointInfoListener.whenComplete(checkpointInfo -> { final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); - cancellableThreads.checkForCancel(); cancellableThreads.execute( () -> source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener) ); @@ -177,7 +175,6 @@ public void startReplication(ActionListener listener) { } private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff)); @@ -203,7 +200,6 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); // Handle empty SegmentInfos bytes for recovering replicas if (checkpointInfoResponse.getInfosBytes() == null) { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 4e78748cc9845..8622e5944b165 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -10,9 +10,6 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -23,14 +20,11 @@ import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.replication.TestReplicationSource; -import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; -import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.RemoteStoreReplicationSource; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -106,66 +100,16 @@ public void testCloseShardWhileGettingCheckpoint() throws Exception { IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); primary.refresh("Test"); - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - - // Create custom replication source in order to trigger shard close operations at specific point of segment replication - // lifecycle - SegmentReplicationSource source = new TestReplicationSource() { - RemoteSegmentStoreDirectory remoteDirectory; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - // shard is closing while fetching metadata - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory(); - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); - - RemoteSegmentMetadata mdFile = null; - try { - mdFile = remoteDirectory.init(); - final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); - Map metadataMap = mdFile.getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null - ) - ) - ); - listener.onResponse( - new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) - ); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - Assert.fail("Unreachable"); - } - }; - when(sourceFactory.get(any())).thenReturn(source); + Runnable beforeCkpSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + Runnable beforeGetFilesSourceCall = () -> Assert.fail("Should not have been executed"); + TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource( + replica, + beforeCkpSourceCall, + beforeGetFilesSourceCall + ); + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); startReplicationAndAssertCancellation(replica, primary, targetService); shards.removeReplica(replica); closeShards(replica); @@ -183,67 +127,14 @@ public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - RemoteSegmentStoreDirectory remoteDirectory; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - try { - FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory(); - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); - - RemoteSegmentMetadata mdFile = remoteDirectory.init(); - final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); - Map metadataMap = mdFile.getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null - ) - ) - ); - listener.onResponse( - new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) - ); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - try { - // shard is closing while we are copying files. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - final Directory storeDirectory = indexShard.store().directory(); - for (StoreFileMetadata fileMetadata : filesToFetch) { - String file = fileMetadata.name(); - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - when(sourceFactory.get(any())).thenReturn(source); + Runnable beforeCkpSourceCall = () -> {}; + Runnable beforeGetFilesSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource( + replica, + beforeCkpSourceCall, + beforeGetFilesSourceCall + ); + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); startReplicationAndAssertCancellation(replica, primary, targetService); shards.removeReplica(replica); closeShards(replica); @@ -522,3 +413,42 @@ private void assertSingleSegmentFile(IndexShard shard, String fileName) throws I assertEquals(segmentsFileNames.stream().findFirst().get(), fileName); } } + +class TestRSReplicationSource extends RemoteStoreReplicationSource { + + private final Thread beforeCheckpoint; + private final Thread beforeGetFiles; + + public TestRSReplicationSource(IndexShard indexShard, Runnable beforeCheckpoint, Runnable beforeGetFiles) { + super(indexShard); + this.beforeCheckpoint = new Thread(beforeCheckpoint); + this.beforeGetFiles = new Thread(beforeGetFiles); + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + this.beforeCheckpoint.start(); + super.getCheckpointMetadata(replicationId, checkpoint, listener); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + this.beforeGetFiles.start(); + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener); + } + + @Override + public String getDescription() { + return "TestReplicationSource"; + } +}