diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0101379321932..dfe27c54444d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,20 +9,15 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.BackgroundIndexer; @@ -36,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import static java.util.Arrays.asList; -import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -139,79 +133,6 @@ public void testCancelPrimaryAllocation() throws Exception { verifyStoreContent(); } - /** - * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - *
- * TODO: Ignoring this test as its flaky and needs separate fix
- */
- @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
- public void testAddNewReplicaFailure() throws Exception {
- logger.info("--> starting [Primary Node] ...");
- final String primaryNode = internalCluster().startNode();
-
- logger.info("--> creating test index ...");
- prepareCreate(
- INDEX_NAME,
- Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
-
- ).get();
-
- logger.info("--> index 10 docs");
- for (int i = 0; i < 10; i++) {
- client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
- }
- logger.info("--> flush so we have some segment files on disk");
- flush(INDEX_NAME);
- logger.info("--> index more docs so we have something in the translog");
- for (int i = 10; i < 20; i++) {
- client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
- }
- refresh(INDEX_NAME);
- logger.info("--> verifying count");
- assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
-
- logger.info("--> start empty node to add replica shard");
- final String replicaNode = internalCluster().startNode();
-
- // Mock transport service to add behaviour of throwing corruption exception during segment replication process.
- MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
- TransportService.class,
- primaryNode
- ));
- mockTransportService.addSendBehavior(
- internalCluster().getInstance(TransportService.class, replicaNode),
- (connection, requestId, action, request, options) -> {
- if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
- throw new OpenSearchCorruptionException("expected");
- }
- connection.sendRequest(requestId, action, request, options);
- }
- );
- ensureGreen(INDEX_NAME);
- // Add Replica shard to the new empty replica node
- assertAcked(
- client().admin()
- .indices()
- .prepareUpdateSettings(INDEX_NAME)
- .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
- );
-
- // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
- ClusterHealthResponse clusterHealthResponse = client().admin()
- .cluster()
- .prepareHealth()
- .setWaitForEvents(Priority.LANGUID)
- .setWaitForNodes("2")
- .setWaitForGreenStatus()
- .setTimeout(TimeValue.timeValueSeconds(2))
- .execute()
- .actionGet();
- assertTrue(clusterHealthResponse.isTimedOut());
- ensureYellow(INDEX_NAME);
- IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
- assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
- }
-
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java
index 5b0948dace75d..95617dc229b97 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java
@@ -22,6 +22,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
+import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
@@ -32,6 +33,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
+
/**
* This test class verifies primary shard relocation with segment replication as replication strategy.
*/
@@ -394,4 +397,130 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
waitForSearchableDocs(totalDocCount, replica, newPrimary);
verifyStoreContent();
}
+
+ /**
+ * This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
+ * replication checkpoint upto the primary's by performing a round of segment replication.
+ */
+ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
+ public void testNewlyAddedReplicaIsUpdated() throws Exception {
+ final String primary = internalCluster().startNode();
+ prepareCreate(
+ INDEX_NAME,
+ Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+ ).get();
+ for (int i = 0; i < 10; i++) {
+ client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
+ }
+ logger.info("--> flush so we have some segment files on disk");
+ flush(INDEX_NAME);
+ logger.info("--> index more docs so we have something in the translog");
+ for (int i = 10; i < 20; i++) {
+ client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
+ }
+ refresh(INDEX_NAME);
+ assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);
+
+ logger.info("--> start empty node to add replica shard");
+ final String replica = internalCluster().startNode();
+ ensureGreen(INDEX_NAME);
+ // Update replica count settings to 1 so that peer recovery triggers and recover replica
+ assertAcked(
+ client().admin()
+ .indices()
+ .prepareUpdateSettings(INDEX_NAME)
+ .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
+ );
+
+ ClusterHealthResponse clusterHealthResponse = client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setWaitForNodes("2")
+ .setWaitForGreenStatus()
+ .setTimeout(TimeValue.timeValueSeconds(2))
+ .execute()
+ .actionGet();
+ assertFalse(clusterHealthResponse.isTimedOut());
+ ensureGreen(INDEX_NAME);
+ flushAndRefresh(INDEX_NAME);
+ waitForSearchableDocs(20, primary, replica);
+ verifyStoreContent();
+ }
+
+ /**
+ * This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
+ *
+ * TODO: Ignoring this test as its flaky and needs separate fix
+ */
+ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
+ public void testAddNewReplicaFailure() throws Exception {
+ logger.info("--> starting [Primary Node] ...");
+ final String primaryNode = internalCluster().startNode();
+
+ logger.info("--> creating test index ...");
+ prepareCreate(
+ INDEX_NAME,
+ Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+
+ ).get();
+
+ logger.info("--> index 10 docs");
+ for (int i = 0; i < 10; i++) {
+ client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
+ }
+ logger.info("--> flush so we have some segment files on disk");
+ flush(INDEX_NAME);
+ logger.info("--> index more docs so we have something in the translog");
+ for (int i = 10; i < 20; i++) {
+ client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
+ }
+ refresh(INDEX_NAME);
+ logger.info("--> verifying count");
+ assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);
+
+ logger.info("--> start empty node to add replica shard");
+ final String replica = internalCluster().startNode();
+
+ final CountDownLatch waitForRecovery = new CountDownLatch(1);
+ // Mock transport service to add behaviour of throwing corruption exception during segment replication process.
+ MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
+ TransportService.class,
+ primaryNode
+ ));
+ mockTransportService.addSendBehavior(
+ internalCluster().getInstance(TransportService.class, replica),
+ (connection, requestId, action, request, options) -> {
+ if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
+ waitForRecovery.countDown();
+ throw new OpenSearchCorruptionException("expected");
+ }
+ connection.sendRequest(requestId, action, request, options);
+ }
+ );
+ ensureGreen(INDEX_NAME);
+ // Add Replica shard to the new empty replica node
+ assertAcked(
+ client().admin()
+ .indices()
+ .prepareUpdateSettings(INDEX_NAME)
+ .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
+ );
+ IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica);
+ waitForRecovery.await();
+ assertBusy(() -> assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME))));
+
+ // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
+ ClusterHealthResponse clusterHealthResponse = client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setWaitForNodes("2")
+ .setWaitForGreenStatus()
+ .setTimeout(TimeValue.timeValueSeconds(2))
+ .execute()
+ .actionGet();
+ assertTrue(clusterHealthResponse.isTimedOut());
+ ensureYellow(INDEX_NAME);
+ }
}
diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java
index 83f4e0c7cbed9..966e2168e263c 100644
--- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java
+++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java
@@ -37,7 +37,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
-import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
@@ -47,7 +46,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
-import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
@@ -84,11 +82,8 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
-import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
-import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
-import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
@@ -782,82 +777,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea
public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState recoveryState = (RecoveryState) state;
- AllocatedIndex extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
- StepListener, List