diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 4a4b4648776b1..9875493b7f5bc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -136,7 +136,10 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo(); if (failure.getExistingPrimaryTerm().isPresent()) { - appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); + final long existingTerm = failure.getExistingPrimaryTerm().getAsLong(); + final Translog.Operation appliedOp = rewriteOperationWithPrimaryTerm(sourceOp, existingTerm); + assert assertSameOperation(primary, appliedOp); + appliedOperations.add(appliedOp); } else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) { assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint(); throw new IllegalStateException("can't find primary_term for existing op=" + targetOp + @@ -153,6 +156,22 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } + private static boolean assertSameOperation(final IndexShard shard, final Translog.Operation newOp) throws IOException { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("assert-ops", newOp.seqNo(), newOp.seqNo(), false)) { + final Translog.Operation existingOp = snapshot.next(); + if (existingOp != null) { + assert existingOp.equals(newOp) : "newOp[" + newOp + "] vs existingOp[" + existingOp + "]"; + final Translog.Operation nextOp = snapshot.next(); + assert nextOp == null : "should have exactly one operation [" + nextOp + "]"; + } else { + // The existing operation may have been merged away after the actual lookup but before this assertion. + assert shard.getGlobalCheckpoint() > newOp.seqNo() : + "Operation not found op[" + newOp + "] global checkpoint[" + shard.getGlobalCheckpoint() + "]"; + } + } + return true; + } + @Override protected WriteReplicaResult shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {