diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 0e063ba4aa659..6af512b011653 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -857,6 +857,7 @@ private static Engine.Result performOpOnReplica( indexRequest.routing() ); result = replica.applyIndexOperationOnReplica( + primaryResponse.getId(), primaryResponse.getSeqNo(), primaryResponse.getPrimaryTerm(), primaryResponse.getVersion(), diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e8b2c61d0f177..8cd1a182fbdfe 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -911,6 +911,7 @@ public Engine.IndexResult applyIndexOperationOnPrimary( } public Engine.IndexResult applyIndexOperationOnReplica( + String id, long seqNo, long opPrimaryTerm, long version, @@ -918,6 +919,23 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { + if (indexSettings.isSegRepEnabled()) { + Engine.Index index = new Engine.Index( + new Term(IdFieldMapper.NAME, Uid.encodeId(id)), + new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getXContentType(), null), + seqNo, + opPrimaryTerm, + version, + null, + Engine.Operation.Origin.REPLICA, + System.nanoTime(), + autoGeneratedTimeStamp, + isRetry, + UNASSIGNED_SEQ_NO, + 0 + ); + return getEngine().index(index); + } return applyIndexOperation( getEngine(), seqNo, @@ -1128,6 +1146,21 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary( } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException { + if (indexSettings.isSegRepEnabled()) { + final Engine.Delete delete = new Engine.Delete( + id, + new Term(IdFieldMapper.NAME, Uid.encodeId(id)), + seqNo, + opPrimaryTerm, + version, + null, + Engine.Operation.Origin.REPLICA, + System.nanoTime(), + UNASSIGNED_SEQ_NO, + 0 + ); + return getEngine().delete(delete); + } return applyDeleteOperation( getEngine(), seqNo, diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java index 509d1f52daa0d..0b976154969dc 100644 --- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java @@ -152,6 +152,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { final IndexShard remainingReplica = shards.getReplicas().get(1); // slip the extra document into the replica remainingReplica.applyIndexOperationOnReplica( + "id", remainingReplica.getLocalCheckpoint() + 1, remainingReplica.getOperationPrimaryTerm(), 1, diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 563b1cf566bf4..a2e1fd0f4570f 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -170,6 +170,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -2297,6 +2298,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id"); shard.getEngine().translogManager().rollTranslogGeneration(); // isolate the delete in it's own generation shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 0, primaryTerm, 1, @@ -2305,6 +2307,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), XContentType.JSON) ); shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 3, primaryTerm, 3, @@ -2315,6 +2318,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { // Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery. shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 2, primaryTerm, 3, @@ -2323,6 +2327,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), XContentType.JSON) ); shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 5, primaryTerm, 1, @@ -2470,6 +2475,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { updateMappings(otherShard, shard.indexSettings().getIndexMetadata()); SourceToParse sourceToParse = new SourceToParse(shard.shardId().getIndexName(), "1", new BytesArray("{}"), XContentType.JSON); otherShard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 1, otherShard.getOperationPrimaryTerm(), 1, @@ -2597,6 +2603,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { final String indexName = shard.shardId().getIndexName(); // Index #0, index #1 shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 0, primaryTerm, 1, @@ -2607,6 +2614,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { flushShard(shard); shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here. shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 1, primaryTerm, 1, @@ -2619,6 +2627,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { shard.getEngine().translogManager().rollTranslogGeneration(); shard.markSeqNoAsNoop(1, primaryTerm, "test"); shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 2, primaryTerm, 1, @@ -4009,6 +4018,7 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope XContentType.JSON ); indexShard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), i, indexShard.getOperationPrimaryTerm(), 1, @@ -4633,6 +4643,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception { seqNo++; // create gaps in sequence numbers } shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), seqNo, shard.getOperationPrimaryTerm(), 1, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index c4db88782638f..0d95f40652523 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -38,6 +38,8 @@ import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.translog.SnapshotMatchers; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -176,6 +178,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); } + assertEqualTranslogOperations(shards, primaryShard); primaryShard.refresh("Test"); replicateSegments(primaryShard, shards.getReplicas()); @@ -189,7 +192,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { ); } } - + assertEqualTranslogOperations(shards, primaryShard); primaryShard.refresh("Test"); replicateSegments(primaryShard, shards.getReplicas()); shards.assertAllEqual(numDocs); @@ -204,6 +207,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i))); } } + assertEqualTranslogOperations(shards, primaryShard); primaryShard.refresh("Test"); replicateSegments(primaryShard, shards.getReplicas()); final List docsAfterDelete = getDocIdAndSeqNos(primaryShard); @@ -753,6 +757,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { for (IndexShard shard : shards.getReplicas()) { assertDocCounts(shard, numDocs, numDocs); } + assertEqualTranslogOperations(shards, oldPrimary); // 2. Create ops that are in the replica's xlog, not in the index. // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs @@ -761,6 +766,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { final int totalDocs = numDocs + additonalDocs; assertDocCounts(oldPrimary, totalDocs, totalDocs); + assertEqualTranslogOperations(shards, oldPrimary); for (IndexShard shard : shards.getReplicas()) { assertDocCounts(shard, totalDocs, numDocs); } @@ -1083,4 +1089,20 @@ private void assertEqualCommittedSegments(IndexShard primary, IndexShard... repl assertTrue(diff.missing.isEmpty()); } } + + private void assertEqualTranslogOperations(ReplicationGroup shards, IndexShard primaryShard) throws IOException { + try (final Translog.Snapshot snapshot = getTranslog(primaryShard).newSnapshot()) { + List operations = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + final Translog.Operation newOp = op; + operations.add(newOp); + } + for (IndexShard replica : shards.getReplicas()) { + try (final Translog.Snapshot replicaSnapshot = getTranslog(replica).newSnapshot()) { + assertThat(replicaSnapshot, SnapshotMatchers.containsOperationsInAnyOrder(operations)); + } + } + } + } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 991215711f039..730d9b4215b73 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -65,6 +65,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; @@ -182,6 +183,7 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException { Randomness.shuffle(seqNos); for (long seqNo : seqNos) { shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), seqNo, 1, shard.getOperationPrimaryTerm(), diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 3d04f808bc30c..eae070b98c4a1 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -78,6 +78,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; @@ -181,6 +182,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment // index #0 orgReplica.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 0, primaryTerm, 1, @@ -190,6 +192,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { ); // index #3 orgReplica.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 3, primaryTerm, 1, @@ -201,6 +204,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); // index #2 orgReplica.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 2, primaryTerm, 1, @@ -212,6 +216,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); // index #5 -> force NoOp #4. orgReplica.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), 5, primaryTerm, 1, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 80e603bd80420..bb3b016560fa7 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -151,6 +151,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -1122,6 +1123,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1; shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates result = shard.applyIndexOperationOnReplica( + UUID.randomUUID().toString(), seqNo, shard.getOperationPrimaryTerm(), 0,