diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 91be5def453fa..ff87e1ee074d8 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -905,22 +905,22 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // TODO: Segrep - This blows up during segrep because we don't have a retention lease for replicas on the primary - we can ignore for poc. - // all tracked shard copies have a corresponding peer-recovery retention lease -// for (final ShardRouting shardRouting : routingTable.assignedShards()) { -// if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { -// assert retentionLeases.contains( -// getPeerRecoveryRetentionLeaseId(shardRouting) -// ) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; -// assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( -// retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() -// ) : "incorrect source [" -// + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() -// + "] for [" -// + shardRouting -// + "] in " -// + retentionLeases; -// } -// } +// all tracked shard copies have a corresponding peer-recovery retention lease + for (final ShardRouting shardRouting : routingTable.assignedShards()) { + if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { + assert retentionLeases.contains( + getPeerRecoveryRetentionLeaseId(shardRouting) + ) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; + assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() + ) : "incorrect source [" + + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() + + "] for [" + + shardRouting + + "] in " + + retentionLeases; + } + } } return true; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index 7b4693f177bb7..051d21a054f98 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -35,10 +35,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.action.support.ThreadedActionListener; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.IndexService; +import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -204,25 +210,55 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, final IndexService indexService = indicesService.indexService(shardId.getIndex()); final IndexShard shard = indexService.getShard(shardId.id()); - PrimaryShardReplicationHandler.runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId), - shardId + " initiating tracking of " + targetAllocationId, shard, new CancellableThreads(), logger); + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - PrimaryShardReplicationHandler.runUnderPrimaryPermit( - () -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), - shardId + " marking " + targetAllocationId + " as in sync", - shard, - new CancellableThreads(), - logger - ); - PrimaryShardReplicationHandler.runUnderPrimaryPermit( - () -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), - shardId + " marking " + targetAllocationId + " as in sync", + if (routingTable.getByAllocationId(targetAllocationId) == null) { + // TODO: Segrep - this is a hack to just wait until the primary has record of the replica's ShardRouting. + // We should instead throw & retry this request from the replica similar to recovery paths. + while (true) { + if (routingTable.getByAllocationId(targetAllocationId) != null) { + break; + } + this.wait(10); + } + logger.info( + "delaying startup of {} as it is not listed as assigned to target node {}", + shardId, + request.getTargetNode() + ); + } + final StepListener addRetentionLeaseStep = new StepListener<>(); + final StepListener responseListener = new StepListener<>(); + PrimaryShardReplicationHandler.runUnderPrimaryPermit(() -> + shard.cloneLocalPeerRecoveryRetentionLease( + request.getTargetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false) + ), + "Add retention lease step", shard, new CancellableThreads(), logger ); + addRetentionLeaseStep.whenComplete(r -> { + PrimaryShardReplicationHandler.runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId), + shardId + " initiating tracking of " + targetAllocationId, shard, new CancellableThreads(), logger); - channel.sendResponse(new TrackShardResponse()); + PrimaryShardReplicationHandler.runUnderPrimaryPermit( + () -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), + shardId + " marking " + targetAllocationId + " as in sync", + shard, + new CancellableThreads(), + logger + ); + PrimaryShardReplicationHandler.runUnderPrimaryPermit( + () -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), + shardId + " marking " + targetAllocationId + " as in sync", + shard, + new CancellableThreads(), + logger + ); + channel.sendResponse(new TrackShardResponse()); + }, responseListener::onFailure); } } }