Skip to content

Commit

Permalink
Segment Replication - Remove redundant replica doc parsing on writes.
Browse files Browse the repository at this point in the history
This change removes unnecessary doc parsing currently performed on replicas by
updating applyIndexOperationOnReplicas to pass a doc id from the primary.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Apr 23, 2023
1 parent 115bb30 commit 2e2ba83
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ private static Engine.Result performOpOnReplica(
indexRequest.routing()
);
result = replica.applyIndexOperationOnReplica(
primaryResponse.getId(),
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
Expand Down
33 changes: 33 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -911,13 +911,31 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
}

public Engine.IndexResult applyIndexOperationOnReplica(
String id,
long seqNo,
long opPrimaryTerm,
long version,
long autoGeneratedTimeStamp,
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
if (indexSettings.isSegRepEnabled()) {
Engine.Index index = new Engine.Index(
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getXContentType(), null),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
autoGeneratedTimeStamp,
isRetry,
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
}
return applyIndexOperation(
getEngine(),
seqNo,
Expand Down Expand Up @@ -1128,6 +1146,21 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException {
if (indexSettings.isSegRepEnabled()) {
final Engine.Delete delete = new Engine.Delete(
id,
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
UNASSIGNED_SEQ_NO,
0
);
return getEngine().delete(delete);
}
return applyDeleteOperation(
getEngine(),
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
final IndexShard remainingReplica = shards.getReplicas().get(1);
// slip the extra document into the replica
remainingReplica.applyIndexOperationOnReplica(
"id",
remainingReplica.getLocalCheckpoint() + 1,
remainingReplica.getOperationPrimaryTerm(),
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -2297,6 +2298,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id");
shard.getEngine().translogManager().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
0,
primaryTerm,
1,
Expand All @@ -2305,6 +2307,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), XContentType.JSON)
);
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
3,
primaryTerm,
3,
Expand All @@ -2315,6 +2318,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
2,
primaryTerm,
3,
Expand All @@ -2323,6 +2327,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), XContentType.JSON)
);
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
5,
primaryTerm,
1,
Expand Down Expand Up @@ -2470,6 +2475,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
updateMappings(otherShard, shard.indexSettings().getIndexMetadata());
SourceToParse sourceToParse = new SourceToParse(shard.shardId().getIndexName(), "1", new BytesArray("{}"), XContentType.JSON);
otherShard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
1,
otherShard.getOperationPrimaryTerm(),
1,
Expand Down Expand Up @@ -2597,6 +2603,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
final String indexName = shard.shardId().getIndexName();
// Index #0, index #1
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
0,
primaryTerm,
1,
Expand All @@ -2607,6 +2614,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
flushShard(shard);
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
1,
primaryTerm,
1,
Expand All @@ -2619,6 +2627,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
shard.getEngine().translogManager().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, primaryTerm, "test");
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
2,
primaryTerm,
1,
Expand Down Expand Up @@ -4009,6 +4018,7 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope
XContentType.JSON
);
indexShard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
i,
indexShard.getOperationPrimaryTerm(),
1,
Expand Down Expand Up @@ -4633,6 +4643,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
seqNo++; // create gaps in sequence numbers
}
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
seqNo,
shard.getOperationPrimaryTerm(),
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.CheckpointInfoResponse;
Expand Down Expand Up @@ -176,6 +178,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON));
}

assertEqualTranslogOperations(shards, primaryShard);
primaryShard.refresh("Test");
replicateSegments(primaryShard, shards.getReplicas());

Expand All @@ -189,7 +192,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
);
}
}

assertEqualTranslogOperations(shards, primaryShard);
primaryShard.refresh("Test");
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(numDocs);
Expand All @@ -204,6 +207,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i)));
}
}
assertEqualTranslogOperations(shards, primaryShard);
primaryShard.refresh("Test");
replicateSegments(primaryShard, shards.getReplicas());
final List<DocIdSeqNoAndSource> docsAfterDelete = getDocIdAndSeqNos(primaryShard);
Expand Down Expand Up @@ -753,6 +757,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, numDocs, numDocs);
}
assertEqualTranslogOperations(shards, oldPrimary);

// 2. Create ops that are in the replica's xlog, not in the index.
// index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs
Expand All @@ -761,6 +766,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
final int totalDocs = numDocs + additonalDocs;

assertDocCounts(oldPrimary, totalDocs, totalDocs);
assertEqualTranslogOperations(shards, oldPrimary);
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, totalDocs, numDocs);
}
Expand Down Expand Up @@ -1083,4 +1089,20 @@ private void assertEqualCommittedSegments(IndexShard primary, IndexShard... repl
assertTrue(diff.missing.isEmpty());
}
}

private void assertEqualTranslogOperations(ReplicationGroup shards, IndexShard primaryShard) throws IOException {
try (final Translog.Snapshot snapshot = getTranslog(primaryShard).newSnapshot()) {
List<Translog.Operation> operations = new ArrayList<>();
Translog.Operation op;
while ((op = snapshot.next()) != null) {
final Translog.Operation newOp = op;
operations.add(newOp);
}
for (IndexShard replica : shards.getReplicas()) {
try (final Translog.Snapshot replicaSnapshot = getTranslog(replica).newSnapshot()) {
assertThat(replicaSnapshot, SnapshotMatchers.containsOperationsInAnyOrder(operations));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -182,6 +183,7 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
Randomness.shuffle(seqNos);
for (long seqNo : seqNos) {
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
seqNo,
1,
shard.getOperationPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -181,6 +182,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
// index #0
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
0,
primaryTerm,
1,
Expand All @@ -190,6 +192,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
);
// index #3
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
3,
primaryTerm,
1,
Expand All @@ -201,6 +204,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
2,
primaryTerm,
1,
Expand All @@ -212,6 +216,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
// index #5 -> force NoOp #4.
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
5,
primaryTerm,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1122,6 +1123,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
result = shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
seqNo,
shard.getOperationPrimaryTerm(),
0,
Expand Down

0 comments on commit 2e2ba83

Please sign in to comment.