From db32ef4b233666154a25ee8175d39b70d98392f7 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 1 Feb 2023 15:52:53 -0800 Subject: [PATCH] [Segment Replication] For replica recovery, force segment replication sync from peer recovery source (#5746) * [Segment Replication] For replica recovery, force segment replication sync from source Signed-off-by: Suraj Singh * Rebase against main Signed-off-by: Suraj Singh * Fix unit test Signed-off-by: Suraj Singh * PR feedback Signed-off-by: Suraj Singh * Fix remote store recovery test Signed-off-by: Suraj Singh --------- Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 79 ----------- .../SegmentReplicationRelocationIT.java | 129 ++++++++++++++++++ .../cluster/IndicesClusterStateService.java | 82 +---------- .../recovery/RecoverySourceHandler.java | 5 + .../SegmentReplicationTargetService.java | 7 +- .../SegmentReplicationIndexShardTests.java | 21 +-- ...teStorePeerRecoverySourceHandlerTests.java | 8 +- .../SegmentReplicationSourceHandlerTests.java | 3 +- .../SegmentReplicationTargetServiceTests.java | 4 +- ...enSearchIndexLevelReplicationTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 14 +- 11 files changed, 170 insertions(+), 184 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0101379321932..dfe27c54444d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,20 +9,15 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.BackgroundIndexer; @@ -36,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import static java.util.Arrays.asList; -import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -139,79 +133,6 @@ public void testCancelPrimaryAllocation() throws Exception { verifyStoreContent(); } - /** - * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - *

- * TODO: Ignoring this test as its flaky and needs separate fix - */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") - public void testAddNewReplicaFailure() throws Exception { - logger.info("--> starting [Primary Node] ..."); - final String primaryNode = internalCluster().startNode(); - - logger.info("--> creating test index ..."); - prepareCreate( - INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - - ).get(); - - logger.info("--> index 10 docs"); - for (int i = 0; i < 10; i++) { - client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); - } - logger.info("--> flush so we have some segment files on disk"); - flush(INDEX_NAME); - logger.info("--> index more docs so we have something in the translog"); - for (int i = 10; i < 20; i++) { - client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); - } - refresh(INDEX_NAME); - logger.info("--> verifying count"); - assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); - - logger.info("--> start empty node to add replica shard"); - final String replicaNode = internalCluster().startNode(); - - // Mock transport service to add behaviour of throwing corruption exception during segment replication process. - MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( - TransportService.class, - primaryNode - )); - mockTransportService.addSendBehavior( - internalCluster().getInstance(TransportService.class, replicaNode), - (connection, requestId, action, request, options) -> { - if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { - throw new OpenSearchCorruptionException("expected"); - } - connection.sendRequest(requestId, action, request, options); - } - ); - ensureGreen(INDEX_NAME); - // Add Replica shard to the new empty replica node - assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) - ); - - // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("2") - .setWaitForGreenStatus() - .setTimeout(TimeValue.timeValueSeconds(2)) - .execute() - .actionGet(); - assertTrue(clusterHealthResponse.isTimedOut()); - ensureYellow(INDEX_NAME); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode); - assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); - } - public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 5b0948dace75d..95617dc229b97 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -22,6 +22,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -32,6 +33,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + /** * This test class verifies primary shard relocation with segment replication as replication strategy. */ @@ -394,4 +397,130 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { waitForSearchableDocs(totalDocCount, replica, newPrimary); verifyStoreContent(); } + + /** + * This test verifies that adding a new node which results in peer recovery as replica; also bring replica's + * replication checkpoint upto the primary's by performing a round of segment replication. + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testNewlyAddedReplicaIsUpdated() throws Exception { + final String primary = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have some segment files on disk"); + flush(INDEX_NAME); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); + + logger.info("--> start empty node to add replica shard"); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + // Update replica count settings to 1 so that peer recovery triggers and recover replica + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(2)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + ensureGreen(INDEX_NAME); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(20, primary, replica); + verifyStoreContent(); + } + + /** + * This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + * + * TODO: Ignoring this test as its flaky and needs separate fix + */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testAddNewReplicaFailure() throws Exception { + logger.info("--> starting [Primary Node] ..."); + final String primaryNode = internalCluster().startNode(); + + logger.info("--> creating test index ..."); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + + ).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have some segment files on disk"); + flush(INDEX_NAME); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + logger.info("--> verifying count"); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); + + logger.info("--> start empty node to add replica shard"); + final String replica = internalCluster().startNode(); + + final CountDownLatch waitForRecovery = new CountDownLatch(1); + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replica), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + waitForRecovery.countDown(); + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + ensureGreen(INDEX_NAME); + // Add Replica shard to the new empty replica node + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica); + waitForRecovery.await(); + assertBusy(() -> assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME)))); + + // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(2)) + .execute() + .actionGet(); + assertTrue(clusterHealthResponse.isTimedOut()); + ensureYellow(INDEX_NAME); + } } diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 83f4e0c7cbed9..966e2168e263c 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; -import org.opensearch.action.StepListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; @@ -47,7 +46,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingNode; @@ -84,11 +82,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationSourceService; -import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; @@ -782,82 +777,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState recoveryState = (RecoveryState) state; - AllocatedIndex indexService = indicesService.indexService(shardRouting.shardId().getIndex()); - StepListener forceSegRepListener = new StepListener<>(); - // For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before - // it is marked as Started. - if (indexService.getIndexSettings().isSegRepEnabled()) { - forceSegmentReplication(indexService, shardRouting, forceSegRepListener); - } else { - forceSegRepListener.onResponse(null); - } - forceSegRepListener.whenComplete( - v -> shardStateAction.shardStarted( - shardRouting, - primaryTerm, - "after " + recoveryState.getRecoverySource(), - SHARD_STATE_ACTION_LISTENER - ), - e -> handleRecoveryFailure(shardRouting, true, e) - ); - } - - /** - * Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary. - */ - private void forceSegmentReplication( - AllocatedIndex indexService, - ShardRouting shardRouting, - StepListener forceSegRepListener - ) { - IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null - && indexShard.indexSettings().isSegRepEnabled() - && shardRouting.primary() == false - && shardRouting.state() == ShardRoutingState.INITIALIZING - && indexShard.state() == IndexShardState.POST_RECOVERY) { - segmentReplicationTargetService.startReplication( - ReplicationCheckpoint.empty(shardRouting.shardId()), - indexShard, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - forceSegRepListener.onResponse(null); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - if (sendShardFailure == true) { - logger.error("replication failure", e); - indexShard.failShard("replication failure", e); - } - forceSegRepListener.onFailure(e); - } - } - ); - } else { - forceSegRepListener.onResponse(null); - } + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } private void failAndRemoveShard( diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index c8c23cec6fd94..3fef056ac7f81 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -834,6 +834,11 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). */ + } else { + // Force round of segment replication to update its checkpoint to primary's + if (shard.indexSettings().isSegRepEnabled()) { + recoveryTarget.forceSegmentFileSync(); + } } stopWatch.stop(); logger.info("finalizing recovery took [{}]", stopWatch.totalTime()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 0b17a2c4ca7f6..4f5aef01c3295 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -338,7 +338,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } - class ForceSyncTransportRequestHandler implements TransportRequestHandler { + private class ForceSyncTransportRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { assert indicesService != null; @@ -358,7 +358,10 @@ public void onReplicationDone(SegmentReplicationState state) { ) ); try { - indexShard.resetToWriteableEngine(); + // Promote engine type for primary target + if (indexShard.recoveryState().getPrimary() == true) { + indexShard.resetToWriteableEngine(); + } channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { throw new RuntimeException(e); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 13c1f05f1e60c..228a7e5268b01 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -511,18 +511,17 @@ public void testReplicaReceivesLowerGeneration() throws Exception { replicateSegments(primary, List.of(replica_1)); assertEqualCommittedSegments(primary, replica_1); - assertLatestCommitGen(4, primary, replica_1); - assertLatestCommitGen(2, replica_2); + assertLatestCommitGen(4, primary); + assertLatestCommitGen(5, replica_1); + assertLatestCommitGen(3, replica_2); shards.promoteReplicaToPrimary(replica_2).get(); primary.close("demoted", false); primary.store().close(); IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); - assertLatestCommitGen(4, oldPrimary); - assertEqualCommittedSegments(oldPrimary, replica_1); - - assertLatestCommitGen(4, replica_2); + assertLatestCommitGen(5, oldPrimary); + assertLatestCommitGen(5, replica_2); numDocs = randomIntBetween(numDocs + 1, numDocs + 10); shards.indexDocs(numDocs); @@ -709,10 +708,12 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { for (IndexShard shard : shards.getReplicas()) { assertDocCounts(shard, totalDocs, numDocs); } - assertEquals(additonalDocs, nextPrimary.translogStats().estimatedNumberOfOperations()); - assertEquals(additonalDocs, replica.translogStats().estimatedNumberOfOperations()); - assertEquals(additonalDocs, nextPrimary.translogStats().getUncommittedOperations()); - assertEquals(additonalDocs, replica.translogStats().getUncommittedOperations()); + assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, nextPrimary.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, replica.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, nextPrimary.translogStats().getUncommittedOperations()); + assertEquals(totalDocs, replica.translogStats().getUncommittedOperations()); // promote the replica shards.syncGlobalCheckpoint(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 465629406b54b..65e235d8a59ee 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -53,16 +53,12 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { assertEquals(1, primary.getRetentionLeases().leases().size()); assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica1.routingEntry()))); - // Step 6 - Start new replica, recovery happens, and check that new replica has docs upto last flush + // Step 6 - Start new replica, recovery happens, and check that new replica has all docs final IndexShard replica2 = shards.addReplica(); shards.startAll(); - assertDocCount(replica2, numDocs); - - // Step 7 - Segment replication, check all shards have same number of docs - replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs + moreDocs); - // Step 8 - Check retention lease does not exist for the replica shard + // Step 7 - Check retention lease does not exist for the replica shard assertEquals(1, primary.getRetentionLeases().leases().size()); assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry()))); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index cde5cd980a91d..b5d8b2baf40dc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -20,6 +20,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.StoreFileMetadata; @@ -51,7 +52,7 @@ public void setUp() throws Exception { super.setUp(); final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); primary = newStartedShard(true, settings); - replica = newShard(primary.shardId(), false); + replica = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 51afaef315f26..5ec1b7eb79dec 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -13,7 +13,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -61,10 +60,9 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); - recoverReplica(replicaShard, primaryShard, true); + recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 6b0f50d80fba2..8df57ccad85cc 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -545,7 +545,7 @@ public void recoverReplica( markAsRecovering, inSyncIds, routingTable, - (a) -> null + getReplicationFunc(replica) ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 179502f633ef6..4a3ce7e96759f 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -818,7 +818,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { } protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, startReplica, (a) -> null); + recoverReplica(replica, primary, startReplica, getReplicationFunc(replica)); } /** recovers a replica from the given primary **/ @@ -848,6 +848,18 @@ protected void recoverReplica( recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a) -> null); } + public Function, List> getReplicationFunc(final IndexShard target) { + return target.indexSettings().isSegRepEnabled() ? (shardList) -> { + try { + assert shardList.size() >= 2; + final IndexShard primary = shardList.get(0); + return replicateSegments(primary, shardList.subList(1, shardList.size())); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } : (a) -> null; + } + /** recovers a replica from the given primary **/ protected void recoverReplica( final IndexShard replica,