diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0d8082da00cd0..d79fe76022ea8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -56,7 +56,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -219,7 +218,57 @@ public void testCancelPrimaryAllocation() throws Exception { } /** - * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + * This test verifies that adding a new node which results in peer recovery as replica; also bring replica's + * replication checkpoint upto the primary's by performing a round of segment replication. + */ + public void testNewlyAddedReplicaIsUpdated() { + internalCluster().startNode(featureFlagSettings()); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have some segment files on disk"); + flush(INDEX_NAME); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); + + logger.info("--> start empty node to add replica shard"); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); + ensureGreen(INDEX_NAME); + // Update replica count settings to 1 so that peer recovery triggers and recover replicaNode + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(2)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + ensureYellow(INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode); + assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME))); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); + } + + /** + * This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. * * TODO: Ignoring this test as its flaky and needs separate fix */ @@ -246,7 +295,7 @@ public void testAddNewReplicaFailure() throws Exception { } refresh(INDEX_NAME); logger.info("--> verifying count"); - assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); logger.info("--> start empty node to add replica shard"); final String replicaNode = internalCluster().startNode(featureFlagSettings()); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e8adcbdc1c89a..966e2168e263c 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; -import org.opensearch.action.StepListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; @@ -83,11 +82,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationSourceService; -import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; @@ -781,78 +777,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState recoveryState = (RecoveryState) state; - AllocatedIndex indexService = indicesService.indexService(shardRouting.shardId().getIndex()); - StepListener forceSegRepListener = new StepListener<>(); - // For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before - // it is marked as Started. - if (indexService.getIndexSettings().isSegRepEnabled()) { - forceSegmentReplication(indexService, shardRouting, forceSegRepListener); - } else { - forceSegRepListener.onResponse(null); - } - forceSegRepListener.whenComplete( - v -> shardStateAction.shardStarted( - shardRouting, - primaryTerm, - "after " + recoveryState.getRecoverySource(), - SHARD_STATE_ACTION_LISTENER - ), - e -> handleRecoveryFailure(shardRouting, true, e) - ); - } - - /** - * Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary. - */ - private void forceSegmentReplication( - AllocatedIndex indexService, - ShardRouting shardRouting, - StepListener forceSegRepListener - ) { - IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { - segmentReplicationTargetService.startReplication( - ReplicationCheckpoint.empty(shardRouting.shardId()), - indexShard, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - forceSegRepListener.onResponse(null); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - if (sendShardFailure == true) { - logger.error("replication failure", e); - indexShard.failShard("replication failure", e); - } - forceSegRepListener.onFailure(e); - } - } - ); - } else { - forceSegRepListener.onResponse(null); - } + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } private void failAndRemoveShard( diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 276821dfb09b4..75691632f2252 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -839,7 +839,12 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * target are failed (see {@link IndexShard#updateRoutingEntry}). */ } else { - handoffListener.onResponse(null); + // Force round of segment replication to update its checkpoint to primary's + if (shard.indexSettings().isSegRepEnabled()) { + recoveryTarget.forceSegmentFileSync(handoffListener); + } else { + handoffListener.onResponse(null); + } } handoffListener.whenComplete(res -> { stopWatch.stop(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 85a34878af03f..c8ebae25bd4cb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -339,7 +339,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } - class ForceSyncTransportRequestHandler implements TransportRequestHandler { + private class ForceSyncTransportRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { assert indicesService != null; @@ -359,7 +359,10 @@ public void onReplicationDone(SegmentReplicationState state) { ) ); try { - indexShard.resetToWriteableEngine(); + // Promote engine type for primary target + if (indexShard.recoveryState().getPrimary() == true) { + indexShard.resetToWriteableEngine(); + } channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { throw new RuntimeException(e);