Skip to content

Commit

Permalink
Fix retention lease invariant in ReplicationTracker.
Browse files Browse the repository at this point in the history
To satisfy this invariant, This change updates the TRACK_SHARD action to clone
the primary's retention lease and use it as the replicas.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 18, 2022
1 parent 9728020 commit dac3a4a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -905,22 +905,22 @@ private boolean invariant() {

if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// TODO: Segrep - This blows up during segrep because we don't have a retention lease for replicas on the primary - we can ignore for poc.
// all tracked shard copies have a corresponding peer-recovery retention lease
// for (final ShardRouting shardRouting : routingTable.assignedShards()) {
// if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
// assert retentionLeases.contains(
// getPeerRecoveryRetentionLeaseId(shardRouting)
// ) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
// assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
// retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
// ) : "incorrect source ["
// + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
// + "] for ["
// + shardRouting
// + "] in "
// + retentionLeases;
// }
// }
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
assert retentionLeases.contains(
getPeerRecoveryRetentionLeaseId(shardRouting)
) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
) : "incorrect source ["
+ retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
+ "] for ["
+ shardRouting
+ "] in "
+ retentionLeases;
}
}
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.IndexService;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -204,25 +210,55 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel,
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard shard = indexService.getShard(shardId.id());

PrimaryShardReplicationHandler.runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId),
shardId + " initiating tracking of " + targetAllocationId, shard, new CancellableThreads(), logger);
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();

PrimaryShardReplicationHandler.runUnderPrimaryPermit(
() -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED),
shardId + " marking " + targetAllocationId + " as in sync",
shard,
new CancellableThreads(),
logger
);
PrimaryShardReplicationHandler.runUnderPrimaryPermit(
() -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED),
shardId + " marking " + targetAllocationId + " as in sync",
if (routingTable.getByAllocationId(targetAllocationId) == null) {
// TODO: Segrep - this is a hack to just wait until the primary has record of the replica's ShardRouting.
// We should instead throw & retry this request from the replica similar to recovery paths.
while (true) {
if (routingTable.getByAllocationId(targetAllocationId) != null) {
break;
}
this.wait(10);
}
logger.info(
"delaying startup of {} as it is not listed as assigned to target node {}",
shardId,
request.getTargetNode()
);
}
final StepListener<ReplicationResponse> addRetentionLeaseStep = new StepListener<>();
final StepListener<ReplicationResponse> responseListener = new StepListener<>();
PrimaryShardReplicationHandler.runUnderPrimaryPermit(() ->
shard.cloneLocalPeerRecoveryRetentionLease(
request.getTargetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)
),
"Add retention lease step",
shard,
new CancellableThreads(),
logger
);
addRetentionLeaseStep.whenComplete(r -> {
PrimaryShardReplicationHandler.runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId),
shardId + " initiating tracking of " + targetAllocationId, shard, new CancellableThreads(), logger);

channel.sendResponse(new TrackShardResponse());
PrimaryShardReplicationHandler.runUnderPrimaryPermit(
() -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED),
shardId + " marking " + targetAllocationId + " as in sync",
shard,
new CancellableThreads(),
logger
);
PrimaryShardReplicationHandler.runUnderPrimaryPermit(
() -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED),
shardId + " marking " + targetAllocationId + " as in sync",
shard,
new CancellableThreads(),
logger
);
channel.sendResponse(new TrackShardResponse());
}, responseListener::onFailure);
}
}
}

0 comments on commit dac3a4a

Please sign in to comment.