From 0cf67979064c6c8be95299911db0d1bf1ea5ed68 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Mon, 12 Dec 2022 07:37:02 -0800 Subject: [PATCH] [Segment Replication] Trigger a round of replication for replica shards during peer recovery when segment replication is enabled (#5332) * Fix new added replica shards falling behind primary. Signed-off-by: Rishikesh1159 * Trigger a round of replication during peer recovery when segment replication is enabled. Signed-off-by: Rishikesh1159 * Remove unnecessary start replication overloaded method. Signed-off-by: Rishikesh1159 * Add test for failure case and refactor some code. Signed-off-by: Rishikesh1159 * Apply spotless check. Signed-off-by: Rishikesh1159 * Addressing comments on the PR. Signed-off-by: Rishikesh1159 * Remove unnecessary condition check. Signed-off-by: Rishikesh1159 * Apply spotless check. Signed-off-by: Rishikesh1159 * Add step listeners to resolve forcing round of segment replication. Signed-off-by: Rishikesh1159 Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 86 +++++++++++++++-- .../cluster/IndicesClusterStateService.java | 93 ++++++++++++++++++- 2 files changed, 166 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2ceb4e0908df3..5ab1fc79fa68a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -10,6 +10,8 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.BeforeClass; +import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; @@ -24,7 +26,9 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.common.Nullable; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; @@ -53,6 +57,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -194,6 +199,75 @@ public void testCancelPrimaryAllocation() throws Exception { assertSegmentStats(REPLICA_COUNT); } + /** + * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + */ + public void testAddNewReplicaFailure() throws Exception { + logger.info("--> starting [Primary Node] ..."); + final String primaryNode = internalCluster().startNode(); + + logger.info("--> creating test index ..."); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have some segment files on disk"); + flush(INDEX_NAME); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + logger.info("--> verifying count"); + assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + + logger.info("--> start empty node to add replica shard"); + final String replicaNode = internalCluster().startNode(); + + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + ensureGreen(INDEX_NAME); + // Add Replica shard to the new empty replica node + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(2)) + .execute() + .actionGet(); + assertTrue(clusterHealthResponse.isTimedOut()); + ensureYellow(INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode); + assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); + } + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -452,18 +526,14 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String replicaNode = internalCluster().startNode(); ensureGreen(INDEX_NAME); - client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + refresh(INDEX_NAME); waitForReplicaUpdate(); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - - IndexShard primaryShard = getIndexShard(primaryNode); - IndexShard replicaShard = getIndexShard(replicaNode); - assertEquals( - primaryShard.translogStats().estimatedNumberOfOperations(), - replicaShard.translogStats().estimatedNumberOfOperations() - ); assertSegmentStats(REPLICA_COUNT); } diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 15a9bf9e4c492..83f4e0c7cbed9 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; @@ -45,11 +46,12 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.component.AbstractLifecycleComponent; @@ -82,8 +84,11 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationSourceService; +import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; @@ -143,6 +148,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; + private final SegmentReplicationTargetService segmentReplicationTargetService; + private final SegmentReplicationCheckpointPublisher checkpointPublisher; @Inject @@ -217,6 +224,7 @@ public IndicesClusterStateService( indexEventListeners.add(segmentReplicationTargetService); indexEventListeners.add(segmentReplicationSourceService); } + this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; this.clusterService = clusterService; @@ -773,8 +781,83 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea } public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { - RecoveryState RecState = (RecoveryState) state; - shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + RecoveryState recoveryState = (RecoveryState) state; + AllocatedIndex indexService = indicesService.indexService(shardRouting.shardId().getIndex()); + StepListener forceSegRepListener = new StepListener<>(); + // For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before + // it is marked as Started. + if (indexService.getIndexSettings().isSegRepEnabled()) { + forceSegmentReplication(indexService, shardRouting, forceSegRepListener); + } else { + forceSegRepListener.onResponse(null); + } + forceSegRepListener.whenComplete( + v -> shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "after " + recoveryState.getRecoverySource(), + SHARD_STATE_ACTION_LISTENER + ), + e -> handleRecoveryFailure(shardRouting, true, e) + ); + } + + /** + * Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary. + */ + private void forceSegmentReplication( + AllocatedIndex indexService, + ShardRouting shardRouting, + StepListener forceSegRepListener + ) { + IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); + if (indexShard != null + && indexShard.indexSettings().isSegRepEnabled() + && shardRouting.primary() == false + && shardRouting.state() == ShardRoutingState.INITIALIZING + && indexShard.state() == IndexShardState.POST_RECOVERY) { + segmentReplicationTargetService.startReplication( + ReplicationCheckpoint.empty(shardRouting.shardId()), + indexShard, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + forceSegRepListener.onResponse(null); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + logger.error("replication failure", e); + indexShard.failShard("replication failure", e); + } + forceSegRepListener.onFailure(e); + } + } + ); + } else { + forceSegRepListener.onResponse(null); + } } private void failAndRemoveShard(