Skip to content

Commit

Permalink
Add an integ test to depict seg rep happening from older primary post…
Browse files Browse the repository at this point in the history
… peer recovery

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Dec 27, 2022
1 parent bca1cde commit 647f6b8
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -53,6 +56,112 @@ private void createIndex() {
).get();
}

/**
* This test tries to mimic state where segment replication from older primary (after primary recovery) is still
* happening on target/replica node and not caught by existing guards (state/index/shard listeners). The test tries
* to simulate this issue by blocking segment replication from older primary to a replica node and then
* triggering a primary recovery to target. After primary change, the older primary still performing the segrep with
* replica node.
*/
public void testPrimaryRelocationWithDup() throws Exception {
final String old_primary = internalCluster().startNode();
createIndex();
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

CountDownLatch latch = new CountDownLatch(1);
// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
old_primary
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replica),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
try {
logger.info("--> blocking old primary");
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
connection.sendRequest(requestId, action, request, options);
}
);

final int initialDocCount = scaledRandomIntBetween(0, 200);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME); // this blocks the segrep on old primary -> replica

logger.info("--> start target node");
final String new_primary = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> relocate the shard");
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();

logger.info("--> state {}", state);

assertThat(
state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(),
equalTo(ShardRoutingState.STARTED)
);

final int finalDocCount = initialDocCount;
for (int i = initialDocCount; i < 2 * initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);

final IndexShard indexShard = getIndexShard(new_primary);

ReplicationCollection<SegmentReplicationTarget> replications = internalCluster().getInstance(SegmentReplicationTargetService.class, replica).getOnGoingReplications();
PrimaryShardReplicationSource source = (PrimaryShardReplicationSource) replications.getOngoingReplicationTarget(indexShard.shardId()).getSource();

assertNotEquals(source.getSourceNode().getName(), old_primary);
logger.info("Source node {} {}", source.getSourceNode().getName(), old_primary);

logger.info("--> verifying count again {}", initialDocCount + finalDocCount);
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(
client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
assertHitCount(
client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
latch.countDown();
}


/**
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before
* relocation and after relocation documents are indexed and document is verified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource {
private final DiscoveryNode targetNode;
private final String targetAllocationId;

public DiscoveryNode getSourceNode() {
return sourceNode;
}

private final DiscoveryNode sourceNode;

public PrimaryShardReplicationSource(
DiscoveryNode targetNode,
String targetAllocationId,
Expand All @@ -56,6 +62,7 @@ public PrimaryShardReplicationSource(
logger
);
this.targetNode = targetNode;
this.sourceNode = sourceNode;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
public class SegmentReplicationTarget extends ReplicationTarget {

private final ReplicationCheckpoint checkpoint;

public SegmentReplicationSource getSource() {
return source;
}

private final SegmentReplicationSource source;
private final SegmentReplicationState state;
protected final MultiFileWriter multiFileWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public class SegmentReplicationTargetService implements IndexEventListener {
private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;

public ReplicationCollection<SegmentReplicationTarget> getOnGoingReplications() {
return onGoingReplications;
}

private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;

private final SegmentReplicationSourceFactory sourceFactory;
Expand Down

0 comments on commit 647f6b8

Please sign in to comment.