Skip to content

Commit

Permalink
[Segment Replication] Remove Doc Parsing for segment replication enab…
Browse files Browse the repository at this point in the history
…led replica shard during translog replay from recovery (#9002)

* Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery.

Signed-off-by: Rishikesh1159 <[email protected]>

* Adding unit test to verify document is not parsed on an segment replication enabled replica shard.

Signed-off-by: Rishikesh1159 <[email protected]>

* remove unnecessary unit tests and address comments.

Signed-off-by: Rishikesh1159 <[email protected]>

* address comments on PR.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 authored Aug 6, 2023
1 parent 2416d45 commit 435408b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.index.query.QueryBuilders.boolQuery;
import static org.opensearch.index.query.QueryBuilders.rangeQuery;
import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -1349,4 +1353,76 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
ensureGreen(INDEX_NAME);
waitForSearchableDocs(2, nodes);
}

public void testIndexWhileRecoveringReplica() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate(INDEX_NAME).setMapping(
jsonBuilder().startObject()
.startObject("_routing")
.field("required", true)
.endObject()
.startObject("properties")
.startObject("online")
.field("type", "boolean")
.endObject()
.startObject("ts")
.field("type", "date")
.field("ignore_malformed", false)
.field("format", "epoch_millis")
.endObject()
.startObject("bs")
.field("type", "keyword")
.endObject()
.endObject()
.endObject()
)
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();

client().prepareIndex(INDEX_NAME)
.setId("1")
.setRouting("Y")
.setSource("online", false, "bs", "Y", "ts", System.currentTimeMillis() - 100, "type", "s")
.get();
client().prepareIndex(INDEX_NAME)
.setId("2")
.setRouting("X")
.setSource("online", true, "bs", "X", "ts", System.currentTimeMillis() - 10000000, "type", "s")
.get();
client().prepareIndex(INDEX_NAME)
.setId("3")
.setRouting(randomAlphaOfLength(2))
.setSource("online", false, "ts", System.currentTimeMillis() - 100, "type", "bs")
.get();
client().prepareIndex(INDEX_NAME)
.setId("4")
.setRouting(randomAlphaOfLength(2))
.setSource("online", true, "ts", System.currentTimeMillis() - 123123, "type", "bs")
.get();
refresh();
ensureGreen(INDEX_NAME);
waitForSearchableDocs(4, primaryNode, replicaNode);

SearchResponse response = client().prepareSearch(INDEX_NAME)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(
boolQuery().must(termQuery("online", true))
.must(
boolQuery().should(
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "bs"))
)
.should(
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "s"))
)
)
)
.setVersion(true)
.setFrom(0)
.setSize(100)
.setExplain(true)
.get();
assertNoFailures(response);
}
}
49 changes: 28 additions & 21 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,8 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
autoGeneratedTimestamp,
isRetry,
Engine.Operation.Origin.PRIMARY,
sourceToParse
sourceToParse,
null
);
}

Expand All @@ -953,23 +954,6 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
if (indexSettings.isSegRepEnabled()) {
Engine.Index index = new Engine.Index(
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
autoGeneratedTimeStamp,
isRetry,
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
}
return applyIndexOperation(
getEngine(),
seqNo,
Expand All @@ -981,7 +965,8 @@ public Engine.IndexResult applyIndexOperationOnReplica(
autoGeneratedTimeStamp,
isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse
sourceToParse,
id
);
}

Expand All @@ -996,8 +981,29 @@ private Engine.IndexResult applyIndexOperation(
long autoGeneratedTimeStamp,
boolean isRetry,
Engine.Operation.Origin origin,
SourceToParse sourceToParse
SourceToParse sourceToParse,
String id
) throws IOException {

// For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary
// shard.
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
Engine.Index index = new Engine.Index(
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
autoGeneratedTimeStamp,
isRetry,
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
}
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
+ opPrimaryTerm
+ " ] > shard term ["
Expand Down Expand Up @@ -2164,7 +2170,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
index.source(),
MediaTypeRegistry.xContentType(index.source()),
index.routing()
)
),
index.id()
);
break;
case DELETE:
Expand Down

0 comments on commit 435408b

Please sign in to comment.