diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c0e0177fbd3fd..a5e8b0bec079d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -167,6 +167,7 @@ import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.ReplicationState; +import org.opensearch.indices.replication.copy.TrackShardResponse; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -3081,8 +3082,19 @@ public void startRecovery( case PEER: try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); - markAsReplicating(); - segmentReplicationService.startReplication(new ReplicationCheckpoint(this.shardId, -10, -10, 0), this, replicationSource, replicationListener); + segmentReplicationService.prepareForReplication(this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), new ActionListener() { + @Override + public void onResponse(TrackShardResponse unused) { + replicationListener.onReplicationDone(replicationState); + recoveryState.getIndex().setFileDetailsComplete(); + finalizeRecovery(); + postRecovery("Segrep recovery complete."); + } + @Override + public void onFailure(Exception e) { + logger.error("fail", e); + } + }); } catch (Exception e) { logger.error("Error", e); failShard("corrupted preexisting index", e); @@ -3692,32 +3704,37 @@ public void sync() throws IOException { /** * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. */ - public void onNewCheckpoint(final PublishCheckpointRequest request, + public synchronized void onNewCheckpoint(final PublishCheckpointRequest request, final PrimaryShardReplicationSource source, final SegmentReplicationService segmentReplicationService) { - if (this.replicationState.getStage() == ReplicationState.Stage.INACTIVE) { - try { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received new checkpoint {}", checkpoint); - // TODO: segrep - these are the states set after we perform our initial store recovery. - if (RecoveryState.Stage.TRANSLOG != this.recoveryState.getStage() && this.state.equals(IndexShardState.RECOVERING) == false) { - logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); - return; + if (state.equals(IndexShardState.STARTED) == false) { + logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); + return; + } + if (isReplicating()) { + logger.info("Ignore - shard is currently replicating to a checkpoint"); + return; + } + try { + markAsReplicating(); + final ReplicationCheckpoint checkpoint = request.getCheckpoint(); + logger.trace("Received new checkpoint {}", checkpoint); + // TODO: segrep - these are the states set after we perform our initial store recovery. + segmentReplicationService.startReplication(checkpoint, this, source, new SegmentReplicationService.ReplicationListener() { + @Override + public void onReplicationDone(ReplicationState state) { + finalizeReplication(); + logger.info("Replication complete."); } - segmentReplicationService.startReplication(checkpoint, this, source, new SegmentReplicationService.ReplicationListener() { - @Override - public void onReplicationDone(ReplicationState state) { - logger.trace("Replication complete."); - } - @Override - public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.error("Failure", e); - } - }); - } catch (Exception e) { - logger.error("Error", e); - } + @Override + public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + finalizeReplication(); + logger.error("Failure", e); + } + }); + } catch (Exception e) { + logger.error("Error", e); } } @@ -3726,11 +3743,15 @@ public ReplicationState getReplicationState() { } public void markAsReplicating() { - replicationState.setStage(ReplicationState.Stage.ACTIVE); + this.replicationState.setStage(ReplicationState.Stage.ACTIVE); } public void finalizeReplication() { - replicationState.setStage(ReplicationState.Stage.INACTIVE); + this.replicationState.setStage(ReplicationState.Stage.INACTIVE); + } + + private boolean isReplicating() { + return this.replicationState.getStage() == ReplicationState.Stage.ACTIVE; } /** diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index a71fac14df98f..a441c4006f015 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -639,7 +639,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR indicesService.createShard( shardRouting, replicationReplicaService, - new ReplicationListener(), + new ReplicationListener(shardRouting, primaryTerm), replicationSource, recoveryTargetService, new RecoveryListener(shardRouting, primaryTerm), @@ -761,16 +761,31 @@ private static DiscoveryNode findSourceNodeForPeerRecovery( private class ReplicationListener implements SegmentReplicationService.ReplicationListener { - private ReplicationListener() {} + /** + * ShardRouting with which the shard was created + */ + private final ShardRouting shardRouting; + + /** + * Primary term with which the shard was created + */ + private final long primaryTerm; + + private ReplicationListener(final ShardRouting shardRouting, final long primaryTerm) { + this.shardRouting = shardRouting; + this.primaryTerm = primaryTerm; + } @Override public void onReplicationDone(final ReplicationState state) { - logger.info("Replication Done"); + logger.info("Shard setup complete, ready for segment copy."); + shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER); } @Override public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.error("Replication failed", e); + handleRecoveryFailure(shardRouting, sendShardFailure, e); + logger.error("Shard setup failed", e); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java index 3edad128e7495..d7d403b5f0266 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationService.java @@ -13,14 +13,23 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardRecoveryException; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; +import org.opensearch.index.translog.Translog; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.Timer; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; @@ -29,7 +38,16 @@ import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.ReplicationState; import org.opensearch.indices.replication.copy.ReplicationTarget; +import org.opensearch.indices.replication.copy.SegmentReplicationPrimaryService; +import org.opensearch.indices.replication.copy.TrackShardRequest; +import org.opensearch.indices.replication.copy.TrackShardResponse; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; /** * Orchestrator of replication events. @@ -40,6 +58,7 @@ public class SegmentReplicationService implements IndexEventListener { private final ThreadPool threadPool; private final RecoverySettings recoverySettings; + private final TransportService transportService; public ReplicationCollection getOnGoingReplications() { return onGoingReplications; @@ -48,9 +67,11 @@ public ReplicationCollection getOnGoingReplications() { private final ReplicationCollection onGoingReplications; public SegmentReplicationService(final ThreadPool threadPool, - final RecoverySettings recoverySettings) { + final RecoverySettings recoverySettings, + final TransportService transportService) { this.threadPool = threadPool; this.recoverySettings = recoverySettings; + this.transportService = transportService; this.onGoingReplications = new ReplicationCollection(logger, threadPool); } @@ -61,8 +82,36 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + public void prepareForReplication(IndexShard indexShard, DiscoveryNode targetNode, DiscoveryNode sourceNode, ActionListener listener) { + setupReplicaShard(indexShard); + transportService.sendRequest(sourceNode, + SegmentReplicationPrimaryService.Actions.TRACK_SHARD, + new TrackShardRequest(indexShard.shardId(), indexShard.routingEntry().allocationId().getId(), targetNode), + new ActionListenerResponseHandler<>(listener, TrackShardResponse::new)); + } + + private void setupReplicaShard(IndexShard indexShard) throws IndexShardRecoveryException { + indexShard.prepareForIndexRecovery(); + final Store store = indexShard.store(); + store.incRef(); + try { + store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, indexShard.shardId(), + indexShard.getPendingPrimaryTerm()); + store.associateIndexWithNewTranslog(translogUUID); + indexShard.persistRetentionLeases(); + indexShard.openEngineAndRecoverFromTranslog(); + } catch (EngineException | IOException e) { + throw new IndexShardRecoveryException(indexShard.shardId(), "failed to start replica shard", e); + } finally { + store.decRef(); + } + } + public void startReplication(final ReplicationCheckpoint checkpoint, final IndexShard indexShard, PrimaryShardReplicationSource source, final ReplicationListener listener) { final long replicationId = onGoingReplications.startReplication(checkpoint, indexShard, source, listener, recoverySettings.activityTimeout()); + logger.info("Starting replication {}", replicationId); threadPool.generic().execute(new ReplicationRunner(replicationId)); } @@ -144,11 +193,11 @@ public void onResponse(ReplicationResponse replicationResponse) { // final TimeValue replicationTime = new TimeValue(timer.time()); logger.trace("Replication complete {}", replicationId); onGoingReplications.markReplicationAsDone(replicationId); - shard.finalizeReplication(); } @Override public void onFailure(Exception e) { + logger.error("Error", e); if (logger.isTraceEnabled()) { logger.trace( () -> new ParameterizedMessage( diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java index 2bbf59821a385..111f80dc508b4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java @@ -167,11 +167,21 @@ public void sendFiles(CopyState copyState, ActionListener list throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); } - // TODO: This only needs to happen on the initial setup. sendFileStep.whenComplete(r -> { - runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId), - shardId + " initiating tracking of " + targetAllocationId, shard, cancellableThreads, logger); - + runUnderPrimaryPermit( + () -> shard.updateLocalCheckpointForShard(targetAllocationId, request.getCheckpoint().getSeqNo()), + shardId + " updating local checkpoint for " + targetAllocationId, + shard, + cancellableThreads, + logger + ); + runUnderPrimaryPermit( + () -> shard.markAllocationIdAsInSync(targetAllocationId, request.getCheckpoint().getSeqNo()), + shardId + " marking " + targetAllocationId + " as in sync", + shard, + cancellableThreads, + logger + ); try { future.onResponse(new GetFilesResponse()); } finally { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java index 95d2c723f3b25..0ff2dd4e385c3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -11,6 +11,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java index 85ef63ce3023f..a826ae40fa389 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java @@ -46,13 +46,9 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.concurrent.AbstractRefCounted; -import org.opensearch.index.engine.EngineException; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardRecoveryException; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.MultiFileWriter; import org.opensearch.indices.recovery.RecoveryRequestTracker; import org.opensearch.indices.recovery.RecoveryState; @@ -124,48 +120,18 @@ public ReplicationTarget(ReplicationCheckpoint checkpoint, IndexShard indexShard } public void startReplication(ActionListener listener) { - final StepListener shardStartedListener = new StepListener<>(); final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); final StepListener finalizeListener = new StepListener<>(); - ensureShardStarted(shardStartedListener); - // Get list of files to copy from this checkpoint. - shardStartedListener.whenComplete(r -> source.getCheckpointInfo(replicationId, checkpoint, checkpointInfoListener), listener::onFailure); + source.getCheckpointInfo(replicationId, checkpoint, checkpointInfoListener); checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure); getFilesListener.whenComplete(response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), listener::onFailure); finalizeListener.whenComplete(r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); } - private void ensureShardStarted(StepListener shardStartedListener) { - if (indexShard.recoveryState().getStage() == RecoveryState.Stage.INIT) { - setupReplicaShard(indexShard, shardStartedListener); - } else { - shardStartedListener.onResponse(true); - } - } - - private void setupReplicaShard(IndexShard indexShard, StepListener shardStartedListener) throws IndexShardRecoveryException { - indexShard.prepareForIndexRecovery(); - final Store store = indexShard.store(); - store.incRef(); - try { - store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, indexShard.shardId(), - indexShard.getPendingPrimaryTerm()); - store.associateIndexWithNewTranslog(translogUUID); - indexShard.persistRetentionLeases(); - indexShard.openEngineAndRecoverFromTranslog(); - shardStartedListener.onResponse(true); - } catch (EngineException | IOException e) { - throw new IndexShardRecoveryException(indexShard.shardId(), "failed to start replica shard", e); - } finally { - store.decRef(); - } - } public Store store() { ensureRefCount(); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index 979a136795151..7b4693f177bb7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -37,7 +37,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.common.inject.Inject; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.IndexService; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -65,6 +67,7 @@ public class SegmentReplicationPrimaryService { public static class Actions { public static final String GET_CHECKPOINT_INFO = "internal:index/shard/segrep/checkpoint_info"; public static final String GET_FILES = "internal:index/shard/segrep/get_files"; + public static final String TRACK_SHARD = "internal:index/shard/segrep/track_shard"; } private final TransportService transportService; @@ -93,6 +96,13 @@ public SegmentReplicationPrimaryService(TransportService transportService, Indic GetFilesRequest::new, new GetFilesRequestHandler() ); + + transportService.registerRequestHandler( + Actions.TRACK_SHARD, + ThreadPool.Names.GENERIC, + TrackShardRequest::new, + new TrackShardRequestHandler() + ); } private static final class CopyStateCache { @@ -103,6 +113,7 @@ public void addCopyState(CopyState copyState) { } public CopyState getCopyStateForCheckpoint(ReplicationCheckpoint checkpoint) { + // TODO: We should incref here and return from StartReplicationHandler... return checkpointCopyState.get(checkpoint); } @@ -127,6 +138,7 @@ public void messageReceived(StartReplicationRequest request, TransportChannel ch final ShardId shardId = checkpoint.getShardId(); final IndexService indexService = indicesService.indexService(shardId.getIndex()); final IndexShard shard = indexService.getShard(shardId.id()); + // If we don't have the requested checkpoint, create a new one from the latest commit on the shard. // TODO: Segrep - need checkpoint validation. CopyState nrtCopyState = new CopyState(shard); @@ -139,7 +151,7 @@ class GetFilesRequestHandler implements TransportRequestHandler @Override public void messageReceived(GetFilesRequest request, TransportChannel channel, Task task) throws Exception { if (commitCache.hasCheckpoint(request.getCheckpoint())) { - sendFiles(request, new ChannelActionListener<>(channel, Actions.GET_FILES, request)); + sendFiles(request, new ChannelActionListener<>(channel, Actions.GET_FILES, request)); } else { channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -150,35 +162,67 @@ private void sendFiles(GetFilesRequest request, ActionListener final ShardId shardId = request.getCheckpoint().getShardId(); logger.trace("Requested checkpoint {}", request.getCheckpoint()); - final CopyState copyState = commitCache.getCopyStateForCheckpoint(request.getCheckpoint()); + final CopyState copyState = commitCache.getCopyStateForCheckpoint(request.getCheckpoint()); + + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard shard = indexService.getShard(shardId.id()); + final ReplicaClient replicationTargetHandler = new ReplicaClient( + shardId, + transportService, + request.getTargetNode(), + recoverySettings, + throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime) + ); + PrimaryShardReplicationHandler handler = new PrimaryShardReplicationHandler( + request.getReplicationId(), + shard, + request.getTargetNode(), + request.getTargetAllocationId(), + replicationTargetHandler, + shard.getThreadPool(), + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks(), + recoverySettings.getMaxConcurrentOperations()); + logger.info( + "[{}][{}] fetching files for {}", + shardId.getIndex().getName(), + shardId.id(), + request.getTargetNode() + ); + // TODO: The calling shard could die between requests without finishing. + handler.sendFiles(copyState, ActionListener.runAfter(listener, () -> commitCache.removeCopyState(request.getCheckpoint()))); + } + + class TrackShardRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(TrackShardRequest request, TransportChannel channel, Task task) throws Exception { + + final ShardId shardId = request.getShardId(); + final String targetAllocationId = request.getTargetAllocationId(); final IndexService indexService = indicesService.indexService(shardId.getIndex()); final IndexShard shard = indexService.getShard(shardId.id()); - final ReplicaClient replicationTargetHandler = new ReplicaClient( - shardId, - transportService, - request.getTargetNode(), - recoverySettings, - throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime) + + PrimaryShardReplicationHandler.runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId), + shardId + " initiating tracking of " + targetAllocationId, shard, new CancellableThreads(), logger); + + PrimaryShardReplicationHandler.runUnderPrimaryPermit( + () -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), + shardId + " marking " + targetAllocationId + " as in sync", + shard, + new CancellableThreads(), + logger ); - PrimaryShardReplicationHandler handler = new PrimaryShardReplicationHandler( - request.getReplicationId(), + PrimaryShardReplicationHandler.runUnderPrimaryPermit( + () -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), + shardId + " marking " + targetAllocationId + " as in sync", shard, - request.getTargetNode(), - request.getTargetAllocationId(), - replicationTargetHandler, - shard.getThreadPool(), - request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks(), - recoverySettings.getMaxConcurrentOperations()); - logger.info( - "[{}][{}] fetching files for {}", - shardId.getIndex().getName(), - shardId.id(), - request.getTargetNode() + new CancellableThreads(), + logger ); - // TODO: The calling shard could die between requests without finishing. - handler.sendFiles(copyState, ActionListener.runAfter(listener, () -> commitCache.removeCopyState(request.getCheckpoint()))); + + channel.sendResponse(new TrackShardResponse()); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/TrackShardRequest.java b/server/src/main/java/org/opensearch/indices/replication/copy/TrackShardRequest.java new file mode 100644 index 0000000000000..954352de3791d --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/copy/TrackShardRequest.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.copy; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +public class TrackShardRequest extends SegmentReplicationTransportRequest { + private ShardId shardId; + + public TrackShardRequest(StreamInput in) throws IOException { + super(in); + shardId = new ShardId(in); + } + + public TrackShardRequest(ShardId shardId, String targetAllocationId, DiscoveryNode discoveryNode) { + super(-1, targetAllocationId, discoveryNode); + this.shardId = shardId; + } + + public ShardId getShardId() { + return shardId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/TrackShardResponse.java b/server/src/main/java/org/opensearch/indices/replication/copy/TrackShardResponse.java new file mode 100644 index 0000000000000..2e590a40c2477 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/copy/TrackShardResponse.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.copy; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; + +public class TrackShardResponse extends TransportResponse { + + public TrackShardResponse() {} + + public TrackShardResponse(StreamInput streamInput) {} + + @Override + public void writeTo(StreamOutput out) throws IOException { + + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 53c322aca7bfe..018ce8b1913b0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -931,7 +931,7 @@ protected Node( } b.bind(SegmentReplicationPrimaryService.class) .toInstance(new SegmentReplicationPrimaryService(transportService, indicesService, recoverySettings)); - final SegmentReplicationService segmentReplicationService = new SegmentReplicationService(threadPool, recoverySettings); + final SegmentReplicationService segmentReplicationService = new SegmentReplicationService(threadPool, recoverySettings, transportService); b.bind(SegmentReplicationService.class).toInstance(segmentReplicationService); b.bind(PrimaryShardReplicationSource.class).toInstance(new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, segmentReplicationService)); b.bind(HttpServerTransport.class).toInstance(httpServerTransport);