diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index a47b737c3e198..3f316524556ee 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -12,6 +12,7 @@ import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.ClusterState; @@ -33,6 +34,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; /** * This test class verifies primary shard relocation with segment replication as replication strategy. @@ -494,4 +496,72 @@ public void testAddNewReplicaFailure() throws Exception { assertTrue(clusterHealthResponse.isTimedOut()); ensureYellow(INDEX_NAME); } + + public void testFlushAfterRelocation() throws Exception { + // Starting two nodes with primary and replica shards respectively. + final String primaryNode = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder() + // we want to control refreshes + .put("index.refresh_interval", -1) + ).get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + // Start another empty node for relocation + final String newPrimary = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + ensureGreen(INDEX_NAME); + + // Start indexing docs + final int initialDocCount = scaledRandomIntBetween(2000, 3000); + for (int i = 0; i < initialDocCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + + // Verify segment replication event never happened on replica shard + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .execute() + .actionGet(); + assertFalse(segmentReplicationStatsResponse.hasSegmentReplicationStats()); + + // Relocate primary to new primary. When new primary starts it does perform a flush. + logger.info("--> relocate the shard from primary to newPrimary"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNode, newPrimary)) + .execute(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + // Verify if all docs are present in replica after relocation, if new relocated primary doesn't flush after relocation the below + // assert will fail. + assertBusy( + () -> { + assertHitCount( + client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), + initialDocCount + ); + } + ); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index cb5a7fbc3f181..8071e94d1426d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -346,11 +346,22 @@ public List segments(boolean verbose) { } @Override - public void refresh(String source) throws EngineException {} + public void refresh(String source) throws EngineException { + maybeRefresh(source); + } @Override public boolean maybeRefresh(String source) throws EngineException { - return false; + try { + return readerManager.maybeRefresh(); + } catch (IOException e) { + try { + failEngine("refresh failed source[" + source + "]", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new RefreshFailedEngineException(shardId, e); + } } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 8fbb24720aedc..00748acb1d76d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -51,6 +51,10 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { @Override protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException { Objects.requireNonNull(referenceToRefresh); + // checks if an actual refresh (change in segments) happened + if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) { + return null; + } final List subs = new ArrayList<>(); final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh); for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index dd9d967d74ad5..66d095878d123 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) { + if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b28d02158a17a..ea7e71a6dd938 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -569,6 +569,12 @@ public void updateShardState( : "a primary relocation is completed by the cluster-managerr, but primary mode is not active " + currentRouting; changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); + + // Flush here after relocation of primary, so that replica get all changes from new primary rather than waiting for more + // docs to get indexed. + if (indexSettings.isSegRepEnabled()) { + flush(new FlushRequest().waitIfOngoing(true).force(true)); + } } else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isRelocated() diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index e3d19461f9e35..9e54b210fea04 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -13,18 +13,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.replication.ReplicationMode; -import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationTask; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -33,18 +27,22 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.List; import java.util.Objects; +import org.opensearch.action.support.replication.ReplicationMode; + /** * Replication action responsible for publishing checkpoint to a replica shard. * @@ -110,32 +108,35 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { * Publish checkpoint request to shard */ final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { + String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the sync is authorized threadContext.markAsSystemContext(); PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + final ReplicationTimer timer = new ReplicationTimer(); + timer.start(); + transportService.sendChildRequest( + indexShard.recoveryState().getTargetNode(), + transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - final List replicationTargets = indexShard.getReplicationGroup().getReplicationTargets(); - for (ShardRouting replicationTarget : replicationTargets) { - if (replicationTarget.primary()) { - continue; - } - final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId()); - final ConcreteReplicaRequest replicaRequest = new ConcreteReplicaRequest<>( - request, - replicationTarget.allocationId().getId(), - primaryTerm, - indexShard.getLastKnownGlobalCheckpoint(), - indexShard.getMaxSeqNoOfUpdatesOrDeletes() - ); - final ReplicationTimer timer = new ReplicationTimer(); - timer.start(); - final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); - ActionListener listener = new ActionListener<>() { @Override - public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) { + public void handleResponse(ReplicationResponse response) { timer.stop(); logger.trace( () -> new ParameterizedMessage( @@ -150,22 +151,20 @@ public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) { } @Override - public void onFailure(Exception e) { + public void handleException(TransportException e) { timer.stop(); logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time()); task.setPhase("finished"); taskManager.unregister(task); - if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { - // node shutting down - return; - } if (ExceptionsHelper.unwrap( e, + NodeClosedException.class, IndexNotFoundException.class, AlreadyClosedException.class, - IndexShardClosedException.class + IndexShardClosedException.class, + ShardNotInPrimaryModeException.class ) != null) { - // the index was deleted or the shard is closed + // Node is shutting down or the index was deleted or the shard is closed return; } logger.warn( @@ -173,13 +172,8 @@ public void onFailure(Exception e) { e ); } - }; - final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>( - listener, - ReplicaResponse::new - ); - transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler); - } + } + ); logger.trace( () -> new ParameterizedMessage( "[shardId {}] Publishing replication checkpoint [{}]", @@ -196,7 +190,7 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - throw new OpenSearchException("PublishCheckpointAction should not hit primary shards"); + ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); } @Override diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 706410045ba60..d0d63dc96a86a 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -11,9 +11,11 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.search.ReferenceManager; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; import org.opensearch.index.seqno.LocalCheckpointTracker; @@ -179,6 +181,28 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep } } + public void testRefreshOnNRTEngine() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + + ReferenceManager referenceManager = nrtEngine.getReferenceManager(Engine.SearcherScope.EXTERNAL); + OpenSearchDirectoryReader readerBeforeRefresh = referenceManager.acquire(); + + nrtEngine.refresh("test refresh"); + OpenSearchDirectoryReader readerAfterRefresh = referenceManager.acquire(); + + // Verify both readers before and after refresh are same and no change in segments + assertSame(readerBeforeRefresh, readerAfterRefresh); + + } + } + public void testTrimTranslogOps() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index f359393de2b90..014a37249612b 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -8,7 +8,6 @@ package org.opensearch.index.shard; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.ExceptionsHelper; @@ -569,21 +568,15 @@ public void testReplicaReceivesLowerGeneration() throws Exception { numDocs = randomIntBetween(numDocs + 1, numDocs + 10); shards.indexDocs(numDocs); flushShard(primary, false); - assertLatestCommitGen(4, primary); replicateSegments(primary, List.of(replica_1)); assertEqualCommittedSegments(primary, replica_1); - assertLatestCommitGen(4, primary); - assertLatestCommitGen(5, replica_1); - assertLatestCommitGen(3, replica_2); shards.promoteReplicaToPrimary(replica_2).get(); primary.close("demoted", false); primary.store().close(); IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); - assertLatestCommitGen(5, oldPrimary); - assertLatestCommitGen(5, replica_2); numDocs = randomIntBetween(numDocs + 1, numDocs + 10); shards.indexDocs(numDocs); @@ -1078,14 +1071,6 @@ private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws I return newPrimary; } - private void assertLatestCommitGen(long expected, IndexShard... shards) throws IOException { - for (IndexShard indexShard : shards) { - try (final GatedCloseable commit = indexShard.acquireLastIndexCommit(false)) { - assertEquals(expected, commit.get().getGeneration()); - } - } - } - private void assertEqualCommittedSegments(IndexShard primary, IndexShard... replicas) throws IOException { for (IndexShard replica : replicas) { final SegmentInfos replicaInfos = replica.store().readLastCommittedSegmentsInfo(); diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index ea405f5b90c14..2c05fbc9328e5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -8,9 +8,9 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -106,7 +107,10 @@ public void testPublishCheckpointActionOnPrimary() { final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); - expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); + action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { + // we should forward the request containing the current publish checkpoint to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + })); } public void testPublishCheckpointActionOnReplica() {