diff --git a/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java index 8c5604dfac220..e267c5e224581 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -98,7 +98,7 @@ protected void shardOperationOnPrimary( @Override protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { -// replica.flush(request.getRequest()); + replica.flush(request.getRequest()); logger.trace("{} flush request executed on replica", replica.shardId()); return new ReplicaResult(); }); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index a48bc0f311ed7..36fcae1d93d62 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -508,7 +508,7 @@ public final class IndexSettings { */ public static final Setting INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting( "index.replication.segment_replication", - true, + false, Property.IndexScope, Property.Final ); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 29c92db53cf65..1d490f5f84d24 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1200,8 +1200,18 @@ public abstract void forceMerge( */ public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException; + /** + * Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory + * until closed. + * @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted. + */ public SegmentInfosRef getLatestSegmentInfosSafe() { return null; }; + /** + * Fetch a snapshot of the latest SegmentInfos from the engine. + * This method does not ensure that segment files are retained in the directory. + * @return {@link SegmentInfos} + */ public SegmentInfos getLatestSegmentInfos() { return null; }; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 48332cf31fdf1..70a7926214b98 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -352,7 +352,7 @@ public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws I toIndexInput(infosBytes), gen); assert gen == infos.getGeneration(); - externalReaderManager.internalReaderManager.setCurrentInfos(infos); + externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); localCheckpointTracker.markSeqNoAsProcessed(seqNo); } @@ -2110,6 +2110,9 @@ public boolean shouldPeriodicallyFlush() { @Override public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { + if (engineConfig.isPrimary() == false) { + return new CommitId(lastCommittedSegmentInfos.getId()); + } ensureOpen(); if (force && waitIfOngoing == false) { assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing; diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index a6447c2aab4a1..c1c3f37e1bbe9 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -121,7 +121,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re * Switch to new segments, refreshing if necessary. Note that it's the caller job to ensure * there's a held refCount for the incoming infos, so all files exist. */ - public synchronized void setCurrentInfos(SegmentInfos infos) throws IOException { + public synchronized void updateSegments(SegmentInfos infos) throws IOException { currentInfos = infos; maybeRefresh(); } diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 9563bcd9100ce..1c92396c167ad 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -17,6 +17,7 @@ /** * A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas. + * This class is only used with Segment Replication enabled. */ public class CheckpointRefreshListener implements ReferenceManager.RefreshListener { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index bb6f8c3ea5729..4b0dcd2d8d807 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -160,7 +160,7 @@ import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; @@ -1507,10 +1507,20 @@ public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws E } } + /** + * Fetch a ref to the latest SegmentInfos held by the engine. This ensures the files will not be deleted until + * the ref is closed. + * @return {@link Engine.SegmentInfosRef} + * @throws EngineException - When infos cannot be retrieved from the Engine. + */ public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException { return getEngine().getLatestSegmentInfosSafe(); } + /** + * Fetch a snapshot of the latest SegmentInfos held by the engine. + * @return {@link SegmentInfos} + */ public SegmentInfos getLatestSegmentInfos() { return getEngine().getLatestSegmentInfos(); } @@ -1523,10 +1533,10 @@ public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws I getEngine().updateCurrentInfos(infosBytes, gen, seqNo); } - /** - * Snapshots the most recent safe index commit from the currently running engine. - * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. - */ + /** + * Snapshots the most recent safe index commit from the currently running engine. + * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. + */ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { final IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine @@ -3048,8 +3058,8 @@ protected Engine getEngineOrNull() { public void startRecovery( RecoveryState recoveryState, - SegmentReplicationService segmentReplicationService, - SegmentReplicationService.ReplicationListener replicationListener, + SegmentReplicationReplicaService segmentReplicationReplicaService, + SegmentReplicationReplicaService.ReplicationListener replicationListener, PrimaryShardReplicationSource replicationSource, PeerRecoveryTargetService peerRecoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, @@ -3083,7 +3093,7 @@ public void startRecovery( try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); IndexShard indexShard = this; - segmentReplicationService.prepareForReplication(this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), new ActionListener() { + segmentReplicationReplicaService.prepareForReplication(this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), new ActionListener() { @Override public void onResponse(TrackShardResponse unused) { replicationListener.onReplicationDone(replicationState); @@ -3097,7 +3107,7 @@ public void onFailure(Exception e) { } }); } catch (Exception e) { - logger.error("Error", e); + logger.error("Error preparing the shard for Segment replication", e); failShard("corrupted preexisting index", e); recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } @@ -3711,7 +3721,7 @@ public long getProcessedLocalCheckpoint() { */ public synchronized void onNewCheckpoint(final PublishCheckpointRequest request, final PrimaryShardReplicationSource source, - final SegmentReplicationService segmentReplicationService) { + final SegmentReplicationReplicaService segmentReplicationReplicaService) { logger.debug("Checkpoint received {}", request.getCheckpoint()); ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); @@ -3732,16 +3742,16 @@ public synchronized void onNewCheckpoint(final PublishCheckpointRequest request, final ReplicationCheckpoint checkpoint = request.getCheckpoint(); logger.trace("Received new checkpoint {}", checkpoint); // TODO: segrep - these are the states set after we perform our initial store recovery. - segmentReplicationService.startReplication(checkpoint, this, source, new SegmentReplicationService.ReplicationListener() { + segmentReplicationReplicaService.startReplication(checkpoint, this, source, new SegmentReplicationReplicaService.ReplicationListener() { @Override public void onReplicationDone(ReplicationState state) { - finalizeReplication(); + markReplicationComplete(); logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); } @Override public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - finalizeReplication(); + markReplicationComplete(); logger.error("Failure", e); } }); @@ -3758,7 +3768,7 @@ public void markAsReplicating() { this.replicationState.setStage(ReplicationState.Stage.ACTIVE); } - public void finalizeReplication() { + public void markReplicationComplete() { this.replicationState.setStage(ReplicationState.Stage.INACTIVE); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 341df65205f9c..f9b7c7aa49ee1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -137,7 +137,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.node.Node; @@ -836,8 +836,8 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada @Override public IndexShard createShard( final ShardRouting shardRouting, - final SegmentReplicationService replicaService, - final SegmentReplicationService.ReplicationListener replicationListener, + final SegmentReplicationReplicaService segmentReplicationReplicaService, + final SegmentReplicationReplicaService.ReplicationListener replicationListener, final PrimaryShardReplicationSource replicationSource, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, @@ -855,7 +855,7 @@ public IndexShard createShard( RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); indexShard.addShardFailureCallback(onShardFailure); - indexShard.startRecovery(recoveryState, replicaService, replicationListener, replicationSource, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { + indexShard.startRecovery(recoveryState, segmentReplicationReplicaService, replicationListener, replicationSource, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { assert recoveryState.getRecoverySource() .getType() == RecoverySource.Type.LOCAL_SHARDS : "mapping update consumer only required by local shards recovery"; client.admin() diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index a441c4006f015..29f0a5f493be4 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -81,7 +81,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.ReplicationState; @@ -118,7 +118,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final ClusterService clusterService; private final ThreadPool threadPool; - private final SegmentReplicationService replicationReplicaService; + private final SegmentReplicationReplicaService segmentReplicationReplicaService; private final PrimaryShardReplicationSource replicationSource; private final PeerRecoveryTargetService recoveryTargetService; private final ShardStateAction shardStateAction; @@ -157,7 +157,7 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationService replicationReplicaService, + final SegmentReplicationReplicaService replicationReplicaService, final PrimaryShardReplicationSource replicationSource) { this( settings, @@ -186,7 +186,7 @@ public IndicesClusterStateService( final AllocatedIndices> indicesService, final ClusterService clusterService, final ThreadPool threadPool, - final SegmentReplicationService replicationReplicaService, + final SegmentReplicationReplicaService replicationReplicaService, final PrimaryShardReplicationSource replicationSource, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, @@ -201,7 +201,7 @@ public IndicesClusterStateService( final RetentionLeaseSyncer retentionLeaseSyncer ) { this.settings = settings; - this.replicationReplicaService = replicationReplicaService; + this.segmentReplicationReplicaService = replicationReplicaService; this.replicationSource = replicationSource; this.buildInIndexListener = Arrays.asList( peerRecoverySourceService, @@ -638,7 +638,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); indicesService.createShard( shardRouting, - replicationReplicaService, + segmentReplicationReplicaService, new ReplicationListener(shardRouting, primaryTerm), replicationSource, recoveryTargetService, @@ -759,7 +759,7 @@ private static DiscoveryNode findSourceNodeForPeerRecovery( } - private class ReplicationListener implements SegmentReplicationService.ReplicationListener { + private class ReplicationListener implements SegmentReplicationReplicaService.ReplicationListener { /** * ShardRouting with which the shard was created @@ -1038,8 +1038,8 @@ U createIndex(IndexMetadata indexMetadata, List builtInIndex */ T createShard( ShardRouting shardRouting, - SegmentReplicationService replicaService, - SegmentReplicationService.ReplicationListener replicationListener, + SegmentReplicationReplicaService replicaService, + SegmentReplicationReplicaService.ReplicationListener replicationListener, PrimaryShardReplicationSource replicationSource, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, diff --git a/server/src/main/java/org/opensearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/opensearch/indices/flush/SyncedFlushService.java index e96d700b992ce..88c1fd03d9c54 100644 --- a/server/src/main/java/org/opensearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/opensearch/indices/flush/SyncedFlushService.java @@ -164,9 +164,6 @@ public void onFailure(Exception e) { } private void performNormalFlushOnInactive(IndexShard shard) { - if (shard.routingEntry().primary() == false) { - return; - } logger.debug("flushing shard {} on inactive", shard.routingEntry()); shard.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index d3ccef841b90b..4e8c10d52efbd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -79,7 +79,6 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); - public void writeFileChunk(StoreFileMetadata fileMetadata, long position, BytesReference content, boolean lastChunk) throws IOException { assert Transports.assertNotTransportThread("multi_file_writer"); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java similarity index 96% rename from server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java rename to server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index 3f9f9db5f92d1..781b185af2ae8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -56,9 +56,9 @@ /** * Orchestrator of replication events. */ -public class SegmentReplicationService implements IndexEventListener { +public class SegmentReplicationReplicaService implements IndexEventListener { - private static final Logger logger = LogManager.getLogger(SegmentReplicationService.class); + private static final Logger logger = LogManager.getLogger(SegmentReplicationReplicaService.class); private final ThreadPool threadPool; private final RecoverySettings recoverySettings; @@ -70,9 +70,9 @@ public ReplicationCollection getOnGoingReplications() { private final ReplicationCollection onGoingReplications; - public SegmentReplicationService(final ThreadPool threadPool, - final RecoverySettings recoverySettings, - final TransportService transportService) { + public SegmentReplicationReplicaService(final ThreadPool threadPool, + final RecoverySettings recoverySettings, + final TransportService transportService) { this.threadPool = threadPool; this.recoverySettings = recoverySettings; this.transportService = transportService; @@ -145,7 +145,7 @@ private void setupReplicaShard(IndexShard indexShard) throws IndexShardRecoveryE public void startReplication(final ReplicationCheckpoint checkpoint, final IndexShard indexShard, PrimaryShardReplicationSource source, final ReplicationListener listener) { final long replicationId = onGoingReplications.startReplication(checkpoint, indexShard, source, listener, recoverySettings.activityTimeout()); - logger.info("Starting replication {}", replicationId); + logger.trace("Starting replication {}", replicationId); threadPool.generic().execute(new ReplicationRunner(replicationId)); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java index 4515d053fa311..bf78f91b5d16e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java @@ -21,7 +21,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -37,7 +37,7 @@ public class TransportPublishShardCheckpointAction extends TransportReplicationA public static final String ACTION_NAME = PublishCheckpointAction.NAME + "[s]"; - private final SegmentReplicationService replicationService; + private final SegmentReplicationReplicaService replicationService; private final PrimaryShardReplicationSource source; @Inject @@ -49,7 +49,7 @@ public TransportPublishShardCheckpointAction( ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, - SegmentReplicationService segmentCopyService, + SegmentReplicationReplicaService segmentCopyService, PrimaryShardReplicationSource source) { super( settings, diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java index 566a7921830e5..5afa854710585 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java @@ -36,7 +36,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -69,7 +69,7 @@ public class PrimaryShardReplicationSource { private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); private final ThreadPool threadPool; private final RecoverySettings recoverySettings; - private final SegmentReplicationService segmentReplicationService; + private final SegmentReplicationReplicaService segmentReplicationReplicaService; // TODO: Segrep - Cancellation doesn't make sense here as it should be per replication event. private volatile boolean isCancelled = false; @@ -82,13 +82,13 @@ public PrimaryShardReplicationSource(TransportService transportService, ClusterService clusterService, IndicesService indicesService, RecoverySettings recoverySettings, - SegmentReplicationService segmentReplicationReplicaShardService) { + SegmentReplicationReplicaService segmentReplicationReplicaShardService) { this.transportService = transportService; this.clusterService = clusterService; this.indicesService = indicesService; this.recoverySettings = recoverySettings; this.threadPool = transportService.getThreadPool(); - this.segmentReplicationService = segmentReplicationReplicaShardService; + this.segmentReplicationReplicaService = segmentReplicationReplicaShardService; transportService.registerRequestHandler( Actions.FILE_CHUNK, @@ -205,7 +205,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler listener = createOrFinishListener(replicationRef, channel, Actions.FILE_CHUNK, request); if (listener == null) { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java index 217b5b8f99bdb..4687eb48924f6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java @@ -41,7 +41,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -75,7 +75,7 @@ public long startReplication( ReplicationCheckpoint checkpoint, IndexShard indexShard, PrimaryShardReplicationSource source, - SegmentReplicationService.ReplicationListener listener, + SegmentReplicationReplicaService.ReplicationListener listener, TimeValue activityTimeout ) { ReplicationTarget replicationTarget = new ReplicationTarget(checkpoint, indexShard, source, listener); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java index e174d5238abdc..e276630bd2fbd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java @@ -52,7 +52,7 @@ import org.opensearch.indices.recovery.MultiFileWriter; import org.opensearch.indices.recovery.RecoveryRequestTracker; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse; import java.io.IOException; @@ -80,7 +80,7 @@ public ReplicationCheckpoint getCheckpoint() { private final IndexShard indexShard; private final Logger logger; private final PrimaryShardReplicationSource source; - private final SegmentReplicationService.ReplicationListener listener; + private final SegmentReplicationReplicaService.ReplicationListener listener; private final Store store; private final MultiFileWriter multiFileWriter; private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker(); @@ -96,7 +96,7 @@ public ReplicationCheckpoint getCheckpoint() { * @param source source of the recovery where we recover from * @param listener called when recovery is completed/failed */ - public ReplicationTarget(ReplicationCheckpoint checkpoint, IndexShard indexShard, PrimaryShardReplicationSource source, SegmentReplicationService.ReplicationListener listener) { + public ReplicationTarget(ReplicationCheckpoint checkpoint, IndexShard indexShard, PrimaryShardReplicationSource source, SegmentReplicationReplicaService.ReplicationListener listener) { super("replication_status"); this.checkpoint = checkpoint; this.indexShard = indexShard; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index d2f2a8d2c1d46..6a6de03b2fab8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -119,11 +119,7 @@ public void addCopyState(CopyState copyState) { } public CopyState getCopyStateForCheckpoint(ReplicationCheckpoint checkpoint) { - final CopyState copyState = checkpointCopyState.get(checkpoint); - if (copyState != null) { - copyState.incRef(); - } - return copyState; + return checkpointCopyState.get(checkpoint); } public boolean hasCheckpoint(ReplicationCheckpoint checkpoint) { @@ -134,6 +130,7 @@ public void removeCopyState(ReplicationCheckpoint checkpoint) { final Optional nrtCopyState = Optional.ofNullable(checkpointCopyState.get(checkpoint)); nrtCopyState.ifPresent((state) -> { if (state.decRef()) { + // decRef() returns true if there are no longer any references, if so remove it from our cache. checkpointCopyState.remove(checkpoint); } }); @@ -142,7 +139,9 @@ public void removeCopyState(ReplicationCheckpoint checkpoint) { private CopyState getCopyState(ReplicationCheckpoint checkpoint) throws IOException { if (commitCache.hasCheckpoint(checkpoint)) { - return commitCache.getCopyStateForCheckpoint(checkpoint); + final CopyState copyState = commitCache.getCopyStateForCheckpoint(checkpoint); + copyState.incRef(); + return copyState; } final CopyState copyState = buildCopyState(checkpoint.getShardId()); commitCache.addCopyState(copyState); @@ -159,6 +158,7 @@ private class StartReplicationRequestHandler implements TransportRequestHandler< @Override public void messageReceived(StartReplicationRequest request, TransportChannel channel, Task task) throws Exception { final ReplicationCheckpoint checkpoint = request.getCheckpoint(); + logger.trace("Received request for checkpoint {}", checkpoint); final CopyState copyState = getCopyState(checkpoint); channel.sendResponse(new TransportCheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataSnapshot(), copyState.getInfosBytes())); } @@ -177,7 +177,7 @@ public void messageReceived(GetFilesRequest request, TransportChannel channel, T private void sendFiles(GetFilesRequest request, ActionListener listener) { final ShardId shardId = request.getCheckpoint().getShardId(); - logger.trace("Requested checkpoint {}", request.getCheckpoint()); + logger.trace("Requested file copy for checkpoint {}", request.getCheckpoint()); final CopyState copyState = commitCache.getCopyStateForCheckpoint(request.getCheckpoint()); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 018ce8b1913b0..9633168c7b4f7 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -37,7 +37,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.opensearch.index.IndexingPressureService; -import org.opensearch.indices.replication.SegmentReplicationService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.SegmentReplicationPrimaryService; import org.opensearch.watcher.ResourceWatcherService; @@ -931,9 +931,9 @@ protected Node( } b.bind(SegmentReplicationPrimaryService.class) .toInstance(new SegmentReplicationPrimaryService(transportService, indicesService, recoverySettings)); - final SegmentReplicationService segmentReplicationService = new SegmentReplicationService(threadPool, recoverySettings, transportService); - b.bind(SegmentReplicationService.class).toInstance(segmentReplicationService); - b.bind(PrimaryShardReplicationSource.class).toInstance(new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, segmentReplicationService)); + final SegmentReplicationReplicaService segmentReplicationReplicaService = new SegmentReplicationReplicaService(threadPool, recoverySettings, transportService); + b.bind(SegmentReplicationReplicaService.class).toInstance(segmentReplicationReplicaService); + b.bind(PrimaryShardReplicationSource.class).toInstance(new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, segmentReplicationReplicaService)); b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); b.bind(PersistentTasksService.class).toInstance(persistentTasksService);