diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..9311f1fabd2b2 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,7 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ - protected abstract SegmentInfos getLatestSegmentInfos(); + public abstract SegmentInfos getLatestSegmentInfos(); /** * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index e4f4bbbba8f16..c9b5ccaca8800 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -431,7 +431,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - protected SegmentInfos getLatestSegmentInfos() { + public SegmentInfos getLatestSegmentInfos() { return readerManager.getSegmentInfos(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 6262a9269c01c..bc54bb9433fb4 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -271,7 +271,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - protected SegmentInfos getLatestSegmentInfos() { + public SegmentInfos getLatestSegmentInfos() { return lastCommittedSegmentInfos; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5d11c34ca205c..c01260ef4909b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -160,7 +160,6 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.repositories.RepositoriesService; @@ -1374,18 +1373,31 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Returns the lastest Replication Checkpoint that shard received + * Returns the IndexShardSate of current shard */ - public ReplicationCheckpoint getLatestReplicationCheckpoint() { - return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); + public IndexShardState getState() { + return this.state; + } + + /** + * Returns the lastest segmentInfos + */ + public SegmentInfos getLatestSegmentInfos() { + return getEngine().getLatestSegmentInfos(); } /** - * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. + * Returns the lastest Replication Checkpoint that shard received */ - public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { - assert shardRouting.primary() == false; - // TODO + public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + return new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + latestSegmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + latestSegmentInfos.getVersion() + ); } /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 60076f1668af8..40a36e44f4f98 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter.SimpleRateLimiter; +import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -159,6 +160,7 @@ public class RecoverySettings { private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + @Inject public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index afbb80d263805..242f4c953beb6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; @@ -27,6 +28,7 @@ public class SegmentReplicationSourceFactory { private RecoverySettings recoverySettings; private ClusterService clusterService; + @Inject public SegmentReplicationSourceFactory( TransportService transportService, RecoverySettings recoverySettings, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1c6053a72a4c5..da4145322ccd7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -14,9 +14,11 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.RecoverySettings; @@ -38,7 +40,7 @@ * * @opensearch.internal */ -public final class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -58,6 +60,7 @@ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; } + @Inject public SegmentReplicationTargetService( final ThreadPool threadPool, final RecoverySettings recoverySettings, @@ -84,6 +87,31 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + /** + * Invoked when a new checkpoint is received from a primary shard. + * It checks if a new checkpoint should be processed or not and starts replication if needed. + */ + public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { + logger.trace("Checkpoint received {}", () -> requestCheckpoint); + if (shouldProcessCheckpoint(requestCheckpoint, indexShard)) { + logger.trace("Processing new checkpoint {}", requestCheckpoint); + startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace("Replication complete to {}", indexShard.getLatestReplicationCheckpoint()); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + } + }); + + } + } + public void startReplication( final ReplicationCheckpoint checkpoint, final IndexShard indexShard, @@ -98,6 +126,35 @@ public void startReplication(final SegmentReplicationTarget target) { threadPool.generic().execute(new ReplicationRunner(replicationId)); } + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @param indexshard replica shard on which checkpoint is received + * @return true if checkpoint should be processed + */ + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexshard) { + if (indexshard.getState().equals(IndexShardState.STARTED) == false) { + logger.debug("Ignore - shard is not started {}", indexshard.getState()); + return false; + } + ReplicationCheckpoint localCheckpoint = indexshard.getLatestReplicationCheckpoint(); + logger.debug("Local Checkpoint {}", indexshard.getLatestReplicationCheckpoint()); + if (onGoingReplications.isShardReplicating(indexshard.shardId())) { + logger.debug("Ignore - shard is currently replicating to a checkpoint"); + return false; + } + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + return false; + } + return true; + } + /** * Listener that runs on changes in Replication state * diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index b74a69971ebd5..8093b6aee88f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -28,6 +28,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction< public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + private final SegmentReplicationTargetService replicationService; + @Inject public PublishCheckpointAction( Settings settings, @@ -60,7 +63,8 @@ public PublishCheckpointAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + SegmentReplicationTargetService targetService ) { super( settings, @@ -75,6 +79,7 @@ public PublishCheckpointAction( PublishCheckpointRequest::new, ThreadPool.Names.REFRESH ); + this.replicationService = targetService; } @Override @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh ActionListener.completeWith(listener, () -> { logger.trace("Checkpoint received on replica {}", request); if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request); + replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } return new ReplicaResult(); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 98ab9cc4c1708..f84a65206190b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -115,7 +115,7 @@ public int hashCode() { * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); + return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index b8295f0685a7f..d648ca6041ff8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) { return cancelled; } + /** + * check if a shard is currently replicating + * + * @param shardId shardId for which to check if replicating + * @return true if shard is currently replicating + */ + public boolean isShardReplicating(ShardId shardId) { + return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); + } + /** * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index aa17dec5767da..9e81f12355bbd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.junit.Assert; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -39,7 +40,7 @@ public void setUp() throws Exception { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); - indexShard = newShard(false, settings); + indexShard = newStartedShard(false, settings); checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); @@ -48,12 +49,6 @@ public void setUp() throws Exception { sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); } - @Override - public void tearDown() throws Exception { - closeShards(indexShard); - super.tearDown(); - } - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -111,6 +106,68 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept closeShards(indexShard); } + public void testAlreadyOnNewCheckpoint() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(indexShard); + } + + public void testShardAlreadyReplicating() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + final SegmentReplicationTarget spyTarget = Mockito.spy(target); + spy.startReplication(spyTarget); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(indexShard); + } + + public void testNewCheckpointBehindCurrentCheckpoint() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(indexShard); + } + + public void testShardNotStarted() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard shard = newShard(false); + spy.onNewCheckpoint(checkpoint, shard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(shard); + closeShards(indexShard); + } + + public void testShouldProcessCheckpoint() throws IOException { + allowShardFailures(); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + spy.onNewCheckpoint(newCheckpoint, spyShard); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + verify(spy, times(1)).startReplication(any(), any(), captor.capture()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onFailure(new SegmentReplicationState(), new OpenSearchException("testing"), true); + verify(spyShard).failShard(any(), any()); + closeShard(indexShard, false); + + } + public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 074b5ff613b08..da6ab32cb2517 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -20,15 +20,15 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +36,7 @@ import static org.mockito.Mockito.*; import static org.opensearch.test.ClusterServiceUtils.createClusterService; -public class PublishCheckpointActionTests extends OpenSearchTestCase { +public class PublishCheckpointActionTests extends IndexShardTestCase { private ThreadPool threadPool; private CapturingTransport transport; @@ -73,21 +73,16 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException { + public void testPublishCheckpointActionOnPrimary() throws InterruptedException, IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); + final IndexShard indexShard = newShard(true); - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -96,7 +91,8 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -107,24 +103,20 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { // we should forward the request containing the current publish checkpoint to the replica assertThat(result.replicaRequest(), sameInstance(request)); })); + closeShards(indexShard); } - public void testPublishCheckpointActionOnReplica() { + public void testPublishCheckpointActionOnReplica() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); + final IndexShard indexShard = newShard(false); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -133,7 +125,8 @@ public void testPublishCheckpointActionOnReplica() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -145,12 +138,13 @@ public void testPublishCheckpointActionOnReplica() { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(indexShard).onNewCheckpoint(request); + verify(mockTargetService).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); assertTrue(success.get()); + closeShards(indexShard); }