diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index e0fa0dbe1e82a..5ab1fc79fa68a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -17,6 +17,8 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -56,8 +58,10 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { @@ -596,6 +600,60 @@ public void testDeleteOperations() throws Exception { } } + public void testUpdateOperations() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + final String replica = internalCluster().startNode(); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id) + .setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh()); + assertEquals(2, updateResponse.getVersion()); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + + } + } + private void assertSegmentStats(int numberOfReplicas) throws IOException { final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); @@ -687,7 +745,7 @@ public void testDropPrimaryDuringReplication() throws Exception { /** * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception + * @throws Exception if assertion fails */ private void waitForReplicaUpdate() throws Exception { // wait until the replica has the latest segment generation. diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 59263878f1f76..83f4e0c7cbed9 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -35,7 +35,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -47,11 +46,11 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -89,6 +88,7 @@ import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; @@ -724,7 +724,7 @@ private void updateShard( shardStateAction.shardStarted( shardRouting, primaryTerm, - "cluster-manager " + "master " + nodes.getClusterManagerNode() + " marked shard as initializing, but shard state is [" + state @@ -804,7 +804,7 @@ public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting /** * Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary. - */ + */ private void forceSegmentReplication( AllocatedIndex indexService, ShardRouting shardRouting, @@ -834,7 +834,11 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { logger.trace( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication failed, timing data: {}",