From 8b288d815e46c837861637e3d7b1f2451e6d8d22 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 8 Jun 2022 16:46:38 +0000 Subject: [PATCH 01/12] Adding onNewCheckpoint and it's test to start replication. SCheck for latestcheckpoint and replaying logic is removed from this commit and will be added in a different PR Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 2 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 28 +++++--- .../indices/recovery/RecoverySettings.java | 2 + .../SegmentReplicationSourceFactory.java | 2 + .../SegmentReplicationTargetService.java | 59 ++++++++++++++- .../checkpoint/PublishCheckpointAction.java | 9 ++- .../checkpoint/ReplicationCheckpoint.java | 2 +- .../common/ReplicationCollection.java | 10 +++ .../SegmentReplicationTargetServiceTests.java | 71 +++++++++++++++++-- .../PublishCheckpointActionTests.java | 40 +++++------ 12 files changed, 184 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..9311f1fabd2b2 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,7 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ - protected abstract SegmentInfos getLatestSegmentInfos(); + public abstract SegmentInfos getLatestSegmentInfos(); /** * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index e4f4bbbba8f16..c9b5ccaca8800 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -431,7 +431,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - protected SegmentInfos getLatestSegmentInfos() { + public SegmentInfos getLatestSegmentInfos() { return readerManager.getSegmentInfos(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 6262a9269c01c..bc54bb9433fb4 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -271,7 +271,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - protected SegmentInfos getLatestSegmentInfos() { + public SegmentInfos getLatestSegmentInfos() { return lastCommittedSegmentInfos; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5d11c34ca205c..c01260ef4909b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -160,7 +160,6 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.repositories.RepositoriesService; @@ -1374,18 +1373,31 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Returns the lastest Replication Checkpoint that shard received + * Returns the IndexShardSate of current shard */ - public ReplicationCheckpoint getLatestReplicationCheckpoint() { - return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); + public IndexShardState getState() { + return this.state; + } + + /** + * Returns the lastest segmentInfos + */ + public SegmentInfos getLatestSegmentInfos() { + return getEngine().getLatestSegmentInfos(); } /** - * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. + * Returns the lastest Replication Checkpoint that shard received */ - public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { - assert shardRouting.primary() == false; - // TODO + public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + return new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + latestSegmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + latestSegmentInfos.getVersion() + ); } /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 60076f1668af8..40a36e44f4f98 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter.SimpleRateLimiter; +import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -159,6 +160,7 @@ public class RecoverySettings { private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + @Inject public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index afbb80d263805..242f4c953beb6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; @@ -27,6 +28,7 @@ public class SegmentReplicationSourceFactory { private RecoverySettings recoverySettings; private ClusterService clusterService; + @Inject public SegmentReplicationSourceFactory( TransportService transportService, RecoverySettings recoverySettings, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1c6053a72a4c5..da4145322ccd7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -14,9 +14,11 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.RecoverySettings; @@ -38,7 +40,7 @@ * * @opensearch.internal */ -public final class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -58,6 +60,7 @@ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; } + @Inject public SegmentReplicationTargetService( final ThreadPool threadPool, final RecoverySettings recoverySettings, @@ -84,6 +87,31 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + /** + * Invoked when a new checkpoint is received from a primary shard. + * It checks if a new checkpoint should be processed or not and starts replication if needed. + */ + public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { + logger.trace("Checkpoint received {}", () -> requestCheckpoint); + if (shouldProcessCheckpoint(requestCheckpoint, indexShard)) { + logger.trace("Processing new checkpoint {}", requestCheckpoint); + startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace("Replication complete to {}", indexShard.getLatestReplicationCheckpoint()); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + } + }); + + } + } + public void startReplication( final ReplicationCheckpoint checkpoint, final IndexShard indexShard, @@ -98,6 +126,35 @@ public void startReplication(final SegmentReplicationTarget target) { threadPool.generic().execute(new ReplicationRunner(replicationId)); } + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @param indexshard replica shard on which checkpoint is received + * @return true if checkpoint should be processed + */ + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexshard) { + if (indexshard.getState().equals(IndexShardState.STARTED) == false) { + logger.debug("Ignore - shard is not started {}", indexshard.getState()); + return false; + } + ReplicationCheckpoint localCheckpoint = indexshard.getLatestReplicationCheckpoint(); + logger.debug("Local Checkpoint {}", indexshard.getLatestReplicationCheckpoint()); + if (onGoingReplications.isShardReplicating(indexshard.shardId())) { + logger.debug("Ignore - shard is currently replicating to a checkpoint"); + return false; + } + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + return false; + } + return true; + } + /** * Listener that runs on changes in Replication state * diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index b74a69971ebd5..8093b6aee88f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -28,6 +28,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction< public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + private final SegmentReplicationTargetService replicationService; + @Inject public PublishCheckpointAction( Settings settings, @@ -60,7 +63,8 @@ public PublishCheckpointAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + SegmentReplicationTargetService targetService ) { super( settings, @@ -75,6 +79,7 @@ public PublishCheckpointAction( PublishCheckpointRequest::new, ThreadPool.Names.REFRESH ); + this.replicationService = targetService; } @Override @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh ActionListener.completeWith(listener, () -> { logger.trace("Checkpoint received on replica {}", request); if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request); + replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } return new ReplicaResult(); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 98ab9cc4c1708..f84a65206190b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -115,7 +115,7 @@ public int hashCode() { * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); + return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index b8295f0685a7f..d648ca6041ff8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) { return cancelled; } + /** + * check if a shard is currently replicating + * + * @param shardId shardId for which to check if replicating + * @return true if shard is currently replicating + */ + public boolean isShardReplicating(ShardId shardId) { + return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); + } + /** * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index aa17dec5767da..9e81f12355bbd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.junit.Assert; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -39,7 +40,7 @@ public void setUp() throws Exception { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); - indexShard = newShard(false, settings); + indexShard = newStartedShard(false, settings); checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); @@ -48,12 +49,6 @@ public void setUp() throws Exception { sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); } - @Override - public void tearDown() throws Exception { - closeShards(indexShard); - super.tearDown(); - } - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -111,6 +106,68 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept closeShards(indexShard); } + public void testAlreadyOnNewCheckpoint() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(indexShard); + } + + public void testShardAlreadyReplicating() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + final SegmentReplicationTarget spyTarget = Mockito.spy(target); + spy.startReplication(spyTarget); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(indexShard); + } + + public void testNewCheckpointBehindCurrentCheckpoint() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(indexShard); + } + + public void testShardNotStarted() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard shard = newShard(false); + spy.onNewCheckpoint(checkpoint, shard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(shard); + closeShards(indexShard); + } + + public void testShouldProcessCheckpoint() throws IOException { + allowShardFailures(); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + spy.onNewCheckpoint(newCheckpoint, spyShard); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + verify(spy, times(1)).startReplication(any(), any(), captor.capture()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onFailure(new SegmentReplicationState(), new OpenSearchException("testing"), true); + verify(spyShard).failShard(any(), any()); + closeShard(indexShard, false); + + } + public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 074b5ff613b08..da6ab32cb2517 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -20,15 +20,15 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +36,7 @@ import static org.mockito.Mockito.*; import static org.opensearch.test.ClusterServiceUtils.createClusterService; -public class PublishCheckpointActionTests extends OpenSearchTestCase { +public class PublishCheckpointActionTests extends IndexShardTestCase { private ThreadPool threadPool; private CapturingTransport transport; @@ -73,21 +73,16 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException { + public void testPublishCheckpointActionOnPrimary() throws InterruptedException, IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); + final IndexShard indexShard = newShard(true); - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -96,7 +91,8 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -107,24 +103,20 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { // we should forward the request containing the current publish checkpoint to the replica assertThat(result.replicaRequest(), sameInstance(request)); })); + closeShards(indexShard); } - public void testPublishCheckpointActionOnReplica() { + public void testPublishCheckpointActionOnReplica() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); + final IndexShard indexShard = newShard(false); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -133,7 +125,8 @@ public void testPublishCheckpointActionOnReplica() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -145,12 +138,13 @@ public void testPublishCheckpointActionOnReplica() { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(indexShard).onNewCheckpoint(request); + verify(mockTargetService).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); assertTrue(success.get()); + closeShards(indexShard); } From 02c1be36d445ac6fe64a19a41c014ef194286a61 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 9 Jun 2022 17:33:44 +0000 Subject: [PATCH 02/12] Changing binding/inject logic and addressing comments from PR Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 2 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 9 +------- .../indices/recovery/RecoverySettings.java | 1 - .../SegmentReplicationSourceFactory.java | 1 - .../SegmentReplicationTargetService.java | 21 ++++++++++--------- .../main/java/org/opensearch/node/Node.java | 4 ++++ 9 files changed, 20 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 9311f1fabd2b2..4829148322b31 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,7 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ - public abstract SegmentInfos getLatestSegmentInfos(); + protected abstract SegmentInfos getLatestSegmentInfos(); /** * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index b63a39ebb1222..d2d688a90353e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2289,7 +2289,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = internalReaderManager.acquire(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index c9b5ccaca8800..e4f4bbbba8f16 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -431,7 +431,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { return readerManager.getSegmentInfos(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index bc54bb9433fb4..6262a9269c01c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -271,7 +271,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { return lastCommittedSegmentInfos; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c01260ef4909b..b6083580964a0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1372,18 +1372,11 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } - /** - * Returns the IndexShardSate of current shard - */ - public IndexShardState getState() { - return this.state; - } - /** * Returns the lastest segmentInfos */ public SegmentInfos getLatestSegmentInfos() { - return getEngine().getLatestSegmentInfos(); + return getEngine().getSegmentInfosSnapshot().get(); } /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 40a36e44f4f98..08dbbb194d16c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -160,7 +160,6 @@ public class RecoverySettings { private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; - @Inject public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 242f4c953beb6..453075bf7d24e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -28,7 +28,6 @@ public class SegmentReplicationSourceFactory { private RecoverySettings recoverySettings; private ClusterService clusterService; - @Inject public SegmentReplicationSourceFactory( TransportService transportService, RecoverySettings recoverySettings, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index da4145322ccd7..6e5a5e3f57e4f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -90,6 +90,8 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. + * @param requestCheckpoint received checkpoint that is checked for processing + * @param indexShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { logger.trace("Checkpoint received {}", () -> requestCheckpoint); @@ -130,26 +132,25 @@ public void startReplication(final SegmentReplicationTarget target) { * Checks if checkpoint should be processed * * @param requestCheckpoint received checkpoint that is checked for processing - * @param indexshard replica shard on which checkpoint is received + * @param indexShard replica shard on which checkpoint is received * @return true if checkpoint should be processed */ - private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexshard) { - if (indexshard.getState().equals(IndexShardState.STARTED) == false) { - logger.debug("Ignore - shard is not started {}", indexshard.getState()); + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexShard) { + if (indexShard.state().equals(IndexShardState.STARTED) == false) { + logger.trace("Ignoring new replication checkpoint - shard is not started {}", indexShard.state()); return false; } - ReplicationCheckpoint localCheckpoint = indexshard.getLatestReplicationCheckpoint(); - logger.debug("Local Checkpoint {}", indexshard.getLatestReplicationCheckpoint()); - if (onGoingReplications.isShardReplicating(indexshard.shardId())) { - logger.debug("Ignore - shard is currently replicating to a checkpoint"); + ReplicationCheckpoint localCheckpoint = indexShard.getLatestReplicationCheckpoint(); + if (onGoingReplications.isShardReplicating(indexShard.shardId())) { + logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint"); return false; } if (localCheckpoint.isAheadOf(requestCheckpoint)) { - logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); return false; } if (localCheckpoint.equals(requestCheckpoint)) { - logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); return false; } return true; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7e205b88e9eb1..937a93f7211de 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -37,6 +37,8 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.opensearch.index.IndexingPressureService; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -932,6 +934,8 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); + b.bind(SegmentReplicationTargetService.class) + .toInstance(new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService))); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); From 45f80269c4dd702bfbd588fa3f1bcf92bdc3b1b3 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 9 Jun 2022 17:46:47 +0000 Subject: [PATCH 03/12] Applying spotless check Signed-off-by: Rishikesh1159 --- .../opensearch/indices/recovery/RecoverySettings.java | 1 - .../replication/SegmentReplicationSourceFactory.java | 1 - .../replication/SegmentReplicationTargetService.java | 6 +++++- server/src/main/java/org/opensearch/node/Node.java | 9 ++++++++- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 08dbbb194d16c..60076f1668af8 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter.SimpleRateLimiter; -import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 453075bf7d24e..afbb80d263805 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -11,7 +11,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 6e5a5e3f57e4f..21b36c7e7547f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -146,7 +146,11 @@ private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, return false; } if (localCheckpoint.isAheadOf(requestCheckpoint)) { - logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + logger.trace( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ); return false; } if (localCheckpoint.equals(requestCheckpoint)) { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 937a93f7211de..d1dbc2c9d06a7 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -935,7 +935,14 @@ protected Node( b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); b.bind(SegmentReplicationTargetService.class) - .toInstance(new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService))); + .toInstance( + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + ); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); From 2052ff2fcc63c483a6148bddbeff88d989083f96 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 14 Jun 2022 22:15:49 +0000 Subject: [PATCH 04/12] Moving shouldProcessCheckpoint() to IndexShard, and removing some trace logs Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShard.java | 27 ++++++++++++ .../SegmentReplicationTargetService.java | 42 +++---------------- 2 files changed, 33 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b6083580964a0..20d97c77672fd 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1393,6 +1393,33 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { ); } + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (state().equals(IndexShardState.STARTED) == false) { + logger.trace("Ignoring new replication checkpoint - shard is not started {}", state()); + return false; + } + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.trace( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); + return false; + } + return true; + } + /** * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, * without having to worry about the current state of the engine and concurrent flushes. diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 21b36c7e7547f..51b06c6e1140a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -94,18 +94,20 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @param indexShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { - logger.trace("Checkpoint received {}", () -> requestCheckpoint); - if (shouldProcessCheckpoint(requestCheckpoint, indexShard)) { - logger.trace("Processing new checkpoint {}", requestCheckpoint); + if (onGoingReplications.isShardReplicating(indexShard.shardId())) { + logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint"); + return; + } + if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { - logger.trace("Replication complete to {}", indexShard.getLatestReplicationCheckpoint()); } @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { if (sendShardFailure == true) { + logger.error("replication failure", e); indexShard.failShard("replication failure", e); } } @@ -128,38 +130,6 @@ public void startReplication(final SegmentReplicationTarget target) { threadPool.generic().execute(new ReplicationRunner(replicationId)); } - /** - * Checks if checkpoint should be processed - * - * @param requestCheckpoint received checkpoint that is checked for processing - * @param indexShard replica shard on which checkpoint is received - * @return true if checkpoint should be processed - */ - private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexShard) { - if (indexShard.state().equals(IndexShardState.STARTED) == false) { - logger.trace("Ignoring new replication checkpoint - shard is not started {}", indexShard.state()); - return false; - } - ReplicationCheckpoint localCheckpoint = indexShard.getLatestReplicationCheckpoint(); - if (onGoingReplications.isShardReplicating(indexShard.shardId())) { - logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint"); - return false; - } - if (localCheckpoint.isAheadOf(requestCheckpoint)) { - logger.trace( - "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", - localCheckpoint, - requestCheckpoint - ); - return false; - } - if (localCheckpoint.equals(requestCheckpoint)) { - logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); - return false; - } - return true; - } - /** * Listener that runs on changes in Replication state * From e9168e86b5daaea28626717af7c6411ace021386 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 14 Jun 2022 22:26:09 +0000 Subject: [PATCH 05/12] applying spotlessApply Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationTargetService.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 51b06c6e1140a..9c79fb3dc04ea 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.RecoverySettings; @@ -101,8 +100,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestChec if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { @Override - public void onReplicationDone(SegmentReplicationState state) { - } + public void onReplicationDone(SegmentReplicationState state) {} @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { From 3cda46cac5b85af16052e6772427ee7f35116e8e Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 15 Jun 2022 16:02:51 +0000 Subject: [PATCH 06/12] Adding more info to log statement in targetservice class Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationTargetService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 9c79fb3dc04ea..bda0fb936aebe 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -94,7 +94,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { if (onGoingReplications.isShardReplicating(indexShard.shardId())) { - logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint"); + logger.trace("Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", indexShard.getLatestReplicationCheckpoint()); return; } if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { From d585645e358aeb31ea6ee7ecb2e3513a2c8bf195 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 15 Jun 2022 17:26:44 +0000 Subject: [PATCH 07/12] applying spotlessApply Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationTargetService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index bda0fb936aebe..1eead75d0f718 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -94,7 +94,10 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { if (onGoingReplications.isShardReplicating(indexShard.shardId())) { - logger.trace("Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", indexShard.getLatestReplicationCheckpoint()); + logger.trace( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + indexShard.getLatestReplicationCheckpoint() + ); return; } if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { From 9aec21548f6944e2d4bc5a468cc225f5d1976f92 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 22 Jun 2022 00:18:19 +0000 Subject: [PATCH 08/12] Addressing comments on PR Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 1 + .../opensearch/index/shard/IndexShard.java | 54 +++++++++++-------- .../org/opensearch/indices/IndicesModule.java | 28 +++++++++- .../SegmentReplicationTargetService.java | 22 ++++---- ...SegmentReplicationCheckpointPublisher.java | 3 +- .../main/java/org/opensearch/node/Node.java | 18 +++---- .../PublishCheckpointActionTests.java | 27 ++++++---- 7 files changed, 99 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..fa0c4673a437a 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,6 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ + @Nullable protected abstract SegmentInfos getLatestSegmentInfos(); /** diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 32d1db647732c..9441c133fba09 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1376,25 +1376,33 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } - /** - * Returns the lastest segmentInfos - */ - public SegmentInfos getLatestSegmentInfos() { - return getEngine().getSegmentInfosSnapshot().get(); - } - /** * Returns the lastest Replication Checkpoint that shard received */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - return new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - latestSegmentInfos.getGeneration(), - getProcessedLocalCheckpoint(), - latestSegmentInfos.getVersion() - ); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ) + ) + .orElse( + new ReplicationCheckpoint( + shardId, + getOperationPrimaryTerm(), + SequenceNumbers.NO_OPS_PERFORMED, + getProcessedLocalCheckpoint(), + SequenceNumbers.NO_OPS_PERFORMED + ) + ); + } catch (IOException ex) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); + } } /** @@ -1403,22 +1411,26 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { * @param requestCheckpoint received checkpoint that is checked for processing * @return true if checkpoint should be processed */ - public boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { if (state().equals(IndexShardState.STARTED) == false) { - logger.trace("Ignoring new replication checkpoint - shard is not started {}", state()); + logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( - "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", - localCheckpoint, - requestCheckpoint + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ) ); return false; } if (localCheckpoint.equals(requestCheckpoint)) { - logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); + logger.trace( + () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) + ); return false; } return true; diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 0cb2ff958c787..e7aa79d817ede 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -39,6 +39,8 @@ import org.opensearch.action.resync.TransportResyncReplicationAction; import org.opensearch.common.ParseField; import org.opensearch.common.inject.AbstractModule; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.inject.Provider; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry; import org.opensearch.common.util.FeatureFlags; @@ -74,6 +76,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.store.IndicesStore; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; @@ -281,7 +284,30 @@ protected void configure() { bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { - bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + bind(SegmentReplicationCheckpointPublisher.class).toProvider(CheckpointPublisherProvider.class).asEagerSingleton(); + } else { + bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); + } + } + + /** + * This provider is necessary while segment replication is behind a feature flag. + * We don't want to initialize a PublishCheckpointAction with the feature flag disabled. + * + * @opensearch.internal + */ + public final static class CheckpointPublisherProvider implements Provider { + + private final PublishCheckpointAction action; + + @Inject + public CheckpointPublisherProvider(PublishCheckpointAction action) { + this.action = action; + } + + @Override + public SegmentReplicationCheckpointPublisher get() { + return new SegmentReplicationCheckpointPublisher(action); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1eead75d0f718..c44b27911bb7a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -14,7 +14,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; -import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -59,7 +58,6 @@ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; } - @Inject public SegmentReplicationTargetService( final ThreadPool threadPool, final RecoverySettings recoverySettings, @@ -89,19 +87,21 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. - * @param requestCheckpoint received checkpoint that is checked for processing - * @param indexShard replica shard on which checkpoint is received + * @param receivedCheckpoint received checkpoint that is checked for processing + * @param replicaShard replica shard on which checkpoint is received */ - public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { - if (onGoingReplications.isShardReplicating(indexShard.shardId())) { + public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { logger.trace( - "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", - indexShard.getLatestReplicationCheckpoint() + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + replicaShard.getLatestReplicationCheckpoint() + ) ); return; } - if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { - startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { + if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { + startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) {} @@ -109,7 +109,7 @@ public void onReplicationDone(SegmentReplicationState state) {} public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { if (sendShardFailure == true) { logger.error("replication failure", e); - indexShard.failShard("replication failure", e); + replicaShard.failShard("replication failure", e); } } }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 2b09901a947fe..39b7a6fef1abd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.common.inject.Inject; import org.opensearch.index.shard.IndexShard; import java.util.Objects; @@ -22,7 +21,7 @@ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; - @Inject + // This Component is behind feature flag so we are manually binding this in IndicesModule. public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { this(publishAction::publish); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8c894e7b57616..4b4fdc974f8cb 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -937,16 +937,16 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); - b.bind(SegmentReplicationTargetService.class) - .toInstance( - new SegmentReplicationTargetService( - threadPool, - recoverySettings, - transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) - ) - ); if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationTargetService.class) + .toInstance( + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index da6ab32cb2517..77cc1d744f0dc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -20,15 +20,15 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +36,7 @@ import static org.mockito.Mockito.*; import static org.opensearch.test.ClusterServiceUtils.createClusterService; -public class PublishCheckpointActionTests extends IndexShardTestCase { +public class PublishCheckpointActionTests extends OpenSearchTestCase { private ThreadPool threadPool; private CapturingTransport transport; @@ -73,14 +73,19 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException, IOException { + public void testPublishCheckpointActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final IndexShard indexShard = newShard(true); + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); @@ -103,18 +108,21 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException, // we should forward the request containing the current publish checkpoint to the replica assertThat(result.replicaRequest(), sameInstance(request)); })); - closeShards(indexShard); } - public void testPublishCheckpointActionOnReplica() throws IOException { + public void testPublishCheckpointActionOnReplica() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); - final IndexShard indexShard = newShard(false); + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); @@ -138,13 +146,12 @@ public void testPublishCheckpointActionOnReplica() throws IOException { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(mockTargetService).onNewCheckpoint(checkpoint, indexShard); + verify(mockTargetService, times(1)).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); assertTrue(success.get()); - closeShards(indexShard); } From 512ba04bc2ddd5abd28c2824feed394a4db93c4f Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 22 Jun 2022 15:35:14 +0000 Subject: [PATCH 09/12] Adding teardown() in SegmentReplicationTargetServiceTests. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationTargetServiceTests.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 9e81f12355bbd..0423474cb6dad 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -49,7 +49,13 @@ public void setUp() throws Exception { sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); } - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { + @Override + public void tearDown() throws Exception { + closeShards(indexShard); + super.tearDown(); + } + + public void testTargetReturnsSuccess_listenerCompletes() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -73,10 +79,9 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testTargetThrowsException() throws IOException { + public void testTargetThrowsException() { final OpenSearchException expectedError = new OpenSearchException("Fail"); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -103,17 +108,15 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testAlreadyOnNewCheckpoint() throws IOException { + public void testAlreadyOnNewCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); verify(spy, times(0)).startReplication(any(), any(), any()); - closeShards(indexShard); } - public void testShardAlreadyReplicating() throws IOException { + public void testShardAlreadyReplicating() { SegmentReplicationTargetService spy = spy(sut); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -125,14 +128,12 @@ public void testShardAlreadyReplicating() throws IOException { spy.startReplication(spyTarget); spy.onNewCheckpoint(checkpoint, indexShard); verify(spy, times(0)).startReplication(any(), any(), any()); - closeShards(indexShard); } - public void testNewCheckpointBehindCurrentCheckpoint() throws IOException { + public void testNewCheckpointBehindCurrentCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(checkpoint, indexShard); verify(spy, times(0)).startReplication(any(), any(), any()); - closeShards(indexShard); } public void testShardNotStarted() throws IOException { @@ -141,7 +142,6 @@ public void testShardNotStarted() throws IOException { spy.onNewCheckpoint(checkpoint, shard); verify(spy, times(0)).startReplication(any(), any(), any()); closeShards(shard); - closeShards(indexShard); } public void testShouldProcessCheckpoint() throws IOException { @@ -168,7 +168,7 @@ public void testShouldProcessCheckpoint() throws IOException { } - public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -179,6 +179,5 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOExc sut.startReplication(spy); sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); Mockito.verify(spy, times(1)).cancel(any()); - closeShards(indexShard); } } From 8b22a661fda8ddcb57c5c62dbe1ee1f5d08d940f Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 22 Jun 2022 20:28:56 +0000 Subject: [PATCH 10/12] fixing testShouldProcessCheckpoint() in SegmentReplicationTargetServiceTests Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShard.java | 1 - .../SegmentReplicationTargetServiceTests.java | 20 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 24073e9c9ecc1..d25847dde235c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -162,7 +162,6 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1fa7d669e53f7..33734fe85def5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -19,15 +19,13 @@ import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.transport.TransportService; import java.io.IOException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -125,6 +123,7 @@ public void testAlreadyOnNewCheckpoint() { public void testShardAlreadyReplicating() { SegmentReplicationTargetService spy = spy(sut); + // Create a separate target and start it so the shard is already replicating. final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -133,8 +132,11 @@ public void testShardAlreadyReplicating() { ); final SegmentReplicationTarget spyTarget = Mockito.spy(target); spy.startReplication(spyTarget); + + // a new checkpoint comes in for the same IndexShard. spy.onNewCheckpoint(checkpoint, indexShard); verify(spy, times(0)).startReplication(any(), any(), any()); + spyTarget.markAsDone(); } public void testNewCheckpointBehindCurrentCheckpoint() { @@ -151,7 +153,7 @@ public void testShardNotStarted() throws IOException { closeShards(shard); } - public void testShouldProcessCheckpoint() throws IOException { + public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOException { allowShardFailures(); SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(indexShard); @@ -163,16 +165,16 @@ public void testShouldProcessCheckpoint() throws IOException { cp.getSeqNo(), cp.getSegmentInfosVersion() + 1 ); - spy.onNewCheckpoint(newCheckpoint, spyShard); ArgumentCaptor captor = ArgumentCaptor.forClass( SegmentReplicationTargetService.SegmentReplicationListener.class ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(newCheckpoint, spyShard); verify(spy, times(1)).startReplication(any(), any(), captor.capture()); SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); - listener.onFailure(new SegmentReplicationState(), new OpenSearchException("testing"), true); + listener.onFailure(new SegmentReplicationState(new ReplicationLuceneIndex()), new OpenSearchException("testing"), true); verify(spyShard).failShard(any(), any()); closeShard(indexShard, false); - } public void testBeforeIndexShardClosed_CancelsOngoingReplications() { @@ -185,6 +187,6 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget spy = Mockito.spy(target); sut.startReplication(spy); sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); - Mockito.verify(spy, times(1)).cancel(any()); + verify(spy, times(1)).cancel(any()); } } From 33564c3a5d9936c45b78fb4e60d01b3e0f4de7a4 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 22 Jun 2022 22:25:05 +0000 Subject: [PATCH 11/12] Removing CheckpointPublisherProvider in IndicesModule Signed-off-by: Rishikesh1159 --- .../org/opensearch/indices/IndicesModule.java | 23 +------------------ ...SegmentReplicationCheckpointPublisher.java | 2 ++ 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index e7aa79d817ede..51271b002a66f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -284,33 +284,12 @@ protected void configure() { bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { - bind(SegmentReplicationCheckpointPublisher.class).toProvider(CheckpointPublisherProvider.class).asEagerSingleton(); + bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); } else { bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); } } - /** - * This provider is necessary while segment replication is behind a feature flag. - * We don't want to initialize a PublishCheckpointAction with the feature flag disabled. - * - * @opensearch.internal - */ - public final static class CheckpointPublisherProvider implements Provider { - - private final PublishCheckpointAction action; - - @Inject - public CheckpointPublisherProvider(PublishCheckpointAction action) { - this.action = action; - } - - @Override - public SegmentReplicationCheckpointPublisher get() { - return new SegmentReplicationCheckpointPublisher(action); - } - } - /** * A registry for all field mappers. */ diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 39b7a6fef1abd..6be524cea140e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.opensearch.common.inject.Inject; import org.opensearch.index.shard.IndexShard; import java.util.Objects; @@ -22,6 +23,7 @@ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; // This Component is behind feature flag so we are manually binding this in IndicesModule. + @Inject public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { this(publishAction::publish); } From 37bf71bfdf22f16342db03e43e39c745f4d71571 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 22 Jun 2022 22:56:08 +0000 Subject: [PATCH 12/12] spotless check apply Signed-off-by: Rishikesh1159 --- server/src/main/java/org/opensearch/indices/IndicesModule.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 51271b002a66f..29ff507ad9fcf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -39,8 +39,6 @@ import org.opensearch.action.resync.TransportResyncReplicationAction; import org.opensearch.common.ParseField; import org.opensearch.common.inject.AbstractModule; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.inject.Provider; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry; import org.opensearch.common.util.FeatureFlags; @@ -76,7 +74,6 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.store.IndicesStore; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;