From 647f6b860c40c17398ad3bc523ed25d5f59adf62 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 27 Dec 2022 15:20:47 -0800 Subject: [PATCH] Add an integ test to depict seg rep happening from older primary post peer recovery Signed-off-by: Suraj Singh --- .../SegmentReplicationRelocationIT.java | 109 ++++++++++++++++++ .../PrimaryShardReplicationSource.java | 7 ++ .../replication/SegmentReplicationTarget.java | 5 + .../SegmentReplicationTargetService.java | 4 + 4 files changed, 125 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index b87c9d07d04c4..d2a03a7e55467 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -23,6 +23,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -30,6 +32,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; @@ -53,6 +56,112 @@ private void createIndex() { ).get(); } + /** + * This test tries to mimic state where segment replication from older primary (after primary recovery) is still + * happening on target/replica node and not caught by existing guards (state/index/shard listeners). The test tries + * to simulate this issue by blocking segment replication from older primary to a replica node and then + * triggering a primary recovery to target. After primary change, the older primary still performing the segrep with + * replica node. + */ + public void testPrimaryRelocationWithDup() throws Exception { + final String old_primary = internalCluster().startNode(); + createIndex(); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + CountDownLatch latch = new CountDownLatch(1); + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + old_primary + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replica), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + try { + logger.info("--> blocking old primary"); + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + for (int i = 0; i < initialDocCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); // this blocks the segrep on old primary -> replica + + logger.info("--> start target node"); + final String new_primary = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); + ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); + + logger.info("--> state {}", state); + + assertThat( + state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(), + equalTo(ShardRoutingState.STARTED) + ); + + final int finalDocCount = initialDocCount; + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + + final IndexShard indexShard = getIndexShard(new_primary); + + ReplicationCollection replications = internalCluster().getInstance(SegmentReplicationTargetService.class, replica).getOnGoingReplications(); + PrimaryShardReplicationSource source = (PrimaryShardReplicationSource) replications.getOngoingReplicationTarget(indexShard.shardId()).getSource(); + + assertNotEquals(source.getSourceNode().getName(), old_primary); + logger.info("Source node {} {}", source.getSourceNode().getName(), old_primary); + + logger.info("--> verifying count again {}", initialDocCount + finalDocCount); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + latch.countDown(); + } + + /** * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before * relocation and after relocation documents are indexed and document is verified diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 8107f99723eaf..978c4e9f6e257 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -41,6 +41,12 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource { private final DiscoveryNode targetNode; private final String targetAllocationId; + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + private final DiscoveryNode sourceNode; + public PrimaryShardReplicationSource( DiscoveryNode targetNode, String targetAllocationId, @@ -56,6 +62,7 @@ public PrimaryShardReplicationSource( logger ); this.targetNode = targetNode; + this.sourceNode = sourceNode; } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index aadcb577f6174..c53d7f06b1f3d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -49,6 +49,11 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final ReplicationCheckpoint checkpoint; + + public SegmentReplicationSource getSource() { + return source; + } + private final SegmentReplicationSource source; private final SegmentReplicationState state; protected final MultiFileWriter multiFileWriter; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index b633f0fa3b9a0..13d971c26835a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -50,6 +50,10 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final ThreadPool threadPool; private final RecoverySettings recoverySettings; + public ReplicationCollection getOnGoingReplications() { + return onGoingReplications; + } + private final ReplicationCollection onGoingReplications; private final SegmentReplicationSourceFactory sourceFactory;