Skip to content

Commit

Permalink
[Segment Replication] Trigger a round of replication for replica shar…
Browse files Browse the repository at this point in the history
…ds during peer recovery when segment replication is enabled (opensearch-project#5332)

* Fix new added replica shards falling behind primary.

Signed-off-by: Rishikesh1159 <[email protected]>

* Trigger a round of replication during peer recovery when segment replication is enabled.

Signed-off-by: Rishikesh1159 <[email protected]>

* Remove unnecessary start replication overloaded method.

Signed-off-by: Rishikesh1159 <[email protected]>

* Add test for failure case and refactor some code.

Signed-off-by: Rishikesh1159 <[email protected]>

* Apply spotless check.

Signed-off-by: Rishikesh1159 <[email protected]>

* Addressing comments on the PR.

Signed-off-by: Rishikesh1159 <[email protected]>

* Remove unnecessary condition check.

Signed-off-by: Rishikesh1159 <[email protected]>

* Apply spotless check.

Signed-off-by: Rishikesh1159 <[email protected]>

* Add step listeners to resolve forcing round of segment replication.

Signed-off-by: Rishikesh1159 <[email protected]>

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 authored and mch2 committed Mar 4, 2023
1 parent 415b06c commit 80f1452
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -56,8 +58,10 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -596,6 +600,60 @@ public void testDeleteOperations() throws Exception {
}
}

public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
final String replica = internalCluster().startNode();

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
waitForReplicaUpdate();

assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

Set<String> ids = indexer.getIds();
String id = ids.toArray()[0].toString();
UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id)
.setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();
assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh());
assertEquals(2, updateResponse.getVersion());

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);

}
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

Expand Down Expand Up @@ -687,7 +745,7 @@ public void testDropPrimaryDuringReplication() throws Exception {

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception
* @throws Exception if assertion fails
*/
private void waitForReplicaUpdate() throws Exception {
// wait until the replica has the latest segment generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -47,11 +46,11 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RecoverySource.Type;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -89,6 +88,7 @@
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -724,7 +724,7 @@ private void updateShard(
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"cluster-manager "
"master "
+ nodes.getClusterManagerNode()
+ " marked shard as initializing, but shard state is ["
+ state
Expand Down Expand Up @@ -804,7 +804,7 @@ public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting

/**
* Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary.
*/
*/
private void forceSegmentReplication(
AllocatedIndex<? extends Shard> indexService,
ShardRouting shardRouting,
Expand Down Expand Up @@ -834,7 +834,11 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
Expand Down

0 comments on commit 80f1452

Please sign in to comment.