From 435408bb708ceccd88fff2381baabf88075c319c Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Sun, 6 Aug 2023 15:16:18 -0700 Subject: [PATCH] [Segment Replication] Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery (#9002) * Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery. Signed-off-by: Rishikesh1159 * Adding unit test to verify document is not parsed on an segment replication enabled replica shard. Signed-off-by: Rishikesh1159 * remove unnecessary unit tests and address comments. Signed-off-by: Rishikesh1159 * address comments on PR. Signed-off-by: Rishikesh1159 --------- Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 76 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 49 +++++++----- 2 files changed, 104 insertions(+), 21 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 72b6a0296e3bb..4b314ef1ae27b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -91,10 +91,14 @@ import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; +import static org.opensearch.index.query.QueryBuilders.termQuery; +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.index.query.QueryBuilders.rangeQuery; import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -1349,4 +1353,76 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { ensureGreen(INDEX_NAME); waitForSearchableDocs(2, nodes); } + + public void testIndexWhileRecoveringReplica() throws Exception { + final String primaryNode = internalCluster().startDataOnlyNode(); + assertAcked( + prepareCreate(INDEX_NAME).setMapping( + jsonBuilder().startObject() + .startObject("_routing") + .field("required", true) + .endObject() + .startObject("properties") + .startObject("online") + .field("type", "boolean") + .endObject() + .startObject("ts") + .field("type", "date") + .field("ignore_malformed", false) + .field("format", "epoch_millis") + .endObject() + .startObject("bs") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + ) + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + + client().prepareIndex(INDEX_NAME) + .setId("1") + .setRouting("Y") + .setSource("online", false, "bs", "Y", "ts", System.currentTimeMillis() - 100, "type", "s") + .get(); + client().prepareIndex(INDEX_NAME) + .setId("2") + .setRouting("X") + .setSource("online", true, "bs", "X", "ts", System.currentTimeMillis() - 10000000, "type", "s") + .get(); + client().prepareIndex(INDEX_NAME) + .setId("3") + .setRouting(randomAlphaOfLength(2)) + .setSource("online", false, "ts", System.currentTimeMillis() - 100, "type", "bs") + .get(); + client().prepareIndex(INDEX_NAME) + .setId("4") + .setRouting(randomAlphaOfLength(2)) + .setSource("online", true, "ts", System.currentTimeMillis() - 123123, "type", "bs") + .get(); + refresh(); + ensureGreen(INDEX_NAME); + waitForSearchableDocs(4, primaryNode, replicaNode); + + SearchResponse response = client().prepareSearch(INDEX_NAME) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) + .setQuery( + boolQuery().must(termQuery("online", true)) + .must( + boolQuery().should( + boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "bs")) + ) + .should( + boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "s")) + ) + ) + ) + .setVersion(true) + .setFrom(0) + .setSize(100) + .setExplain(true) + .get(); + assertNoFailures(response); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ace6ed56c007c..c485773695334 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -940,7 +940,8 @@ public Engine.IndexResult applyIndexOperationOnPrimary( autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, - sourceToParse + sourceToParse, + null ); } @@ -953,23 +954,6 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { - if (indexSettings.isSegRepEnabled()) { - Engine.Index index = new Engine.Index( - new Term(IdFieldMapper.NAME, Uid.encodeId(id)), - new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null), - seqNo, - opPrimaryTerm, - version, - null, - Engine.Operation.Origin.REPLICA, - System.nanoTime(), - autoGeneratedTimeStamp, - isRetry, - UNASSIGNED_SEQ_NO, - 0 - ); - return getEngine().index(index); - } return applyIndexOperation( getEngine(), seqNo, @@ -981,7 +965,8 @@ public Engine.IndexResult applyIndexOperationOnReplica( autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, - sourceToParse + sourceToParse, + id ); } @@ -996,8 +981,29 @@ private Engine.IndexResult applyIndexOperation( long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, - SourceToParse sourceToParse + SourceToParse sourceToParse, + String id ) throws IOException { + + // For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary + // shard. + if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) { + Engine.Index index = new Engine.Index( + new Term(IdFieldMapper.NAME, Uid.encodeId(id)), + new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null), + seqNo, + opPrimaryTerm, + version, + null, + Engine.Operation.Origin.REPLICA, + System.nanoTime(), + autoGeneratedTimeStamp, + isRetry, + UNASSIGNED_SEQ_NO, + 0 + ); + return getEngine().index(index); + } assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " + opPrimaryTerm + " ] > shard term [" @@ -2164,7 +2170,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o index.source(), MediaTypeRegistry.xContentType(index.source()), index.routing() - ) + ), + index.id() ); break; case DELETE: