Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Assert existing operation in following primary #34664

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public static CcrWritePrimaryResult shardOperationOnPrimary(
final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo();
if (failure.getExistingPrimaryTerm().isPresent()) {
appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong()));
final long existingTerm = failure.getExistingPrimaryTerm().getAsLong();
final Translog.Operation appliedOp = rewriteOperationWithPrimaryTerm(sourceOp, existingTerm);
assert assertSameOperation(primary, appliedOp);
appliedOperations.add(appliedOp);
} else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) {
assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint();
throw new IllegalStateException("can't find primary_term for existing op=" + targetOp +
Expand All @@ -153,6 +156,22 @@ public static CcrWritePrimaryResult shardOperationOnPrimary(
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
}

private static boolean assertSameOperation(final IndexShard shard, final Translog.Operation newOp) throws IOException {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot("assert-ops", newOp.seqNo(), newOp.seqNo(), false)) {
final Translog.Operation existingOp = snapshot.next();
if (existingOp != null) {
assert existingOp.equals(newOp) : "newOp[" + newOp + "] vs existingOp[" + existingOp + "]";
final Translog.Operation nextOp = snapshot.next();
assert nextOp == null : "should have exactly one operation [" + nextOp + "]";
} else {
// The existing operation may have been merged away after the actual lookup but before this assertion.
assert shard.getGlobalCheckpoint() > newOp.seqNo() :
"Operation not found op[" + newOp + "] global checkpoint[" + shard.getGlobalCheckpoint() + "]";
}
}
return true;
}

@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
Expand Down