Skip to content

Commit

Permalink
[Segment Replication] Update bwc test to rely on segments for verific…
Browse files Browse the repository at this point in the history
…ation (#8267)

* [Segment Replication] Use _cat/segments vs index stats + _search to verify doc count

Signed-off-by: Suraj Singh <[email protected]>

Self review

Signed-off-by: Suraj Singh <[email protected]>

remove unused imports

Signed-off-by: Suraj Singh <[email protected]>

Handle 0 doc count segments

Signed-off-by: Suraj Singh <[email protected]>

* Use 1 minute timeout for assertBusy validations and comments

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Jun 28, 2023
1 parent b594f52 commit 270abeb
Showing 1 changed file with 43 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;

/**
* Basic test that indexed documents survive the rolling restart. See
Expand Down Expand Up @@ -88,52 +84,51 @@ private void printClusterNodes() throws IOException, ParseException, URISyntaxEx
}

// Verifies that for each shard copy holds same document count across all containing nodes.
private void waitForSearchableDocs(String index, int shardCount) throws Exception {
Map<Integer,String> primaryShardToNodeIDMap = new HashMap<>();
Map<Integer,String> replicaShardToNodeIDMap = new HashMap<>();
private void waitForSearchableDocs(String index, int shardCount, int replicaCount) throws Exception {
assertTrue(shardCount > 0);
assertTrue(replicaCount > 0);
waitForClusterHealthWithNoShardMigration(index, "green");
logger.info("--> _cat/shards before search \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));

// Verify segment replication stats
verifySegmentStats(index);
Request request = new Request("GET", index + "/_stats");
request.addParameter("level", "shards");
Response response = client().performRequest(request);
for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber);
for (Object shard : shardStats) {
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
final Boolean primary = ObjectPath.evaluate(shard, "routing.primary");
if (primary) {
primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);
} else {
replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);

// Verify segment store
assertBusy(() -> {
/**
* Use default tabular output and sort response based on shard,segment,primaryOrReplica columns to allow line by
* line parsing where records related to a segment (e.g. _0) are chunked together with first record belonging
* to primary while remaining *replicaCount* records belongs to replica copies
* */
Request segrepStatsRequest = new Request("GET", "/_cat/segments/" + index + "?s=shard,segment,primaryOrReplica");
segrepStatsRequest.addParameter("h", "index,shard,primaryOrReplica,segment,docs.count");
Response segrepStatsResponse = client().performRequest(segrepStatsRequest);
logger.info("--> _cat/segments response\n {}", EntityUtils.toString(segrepStatsResponse.getEntity()));
List<String> responseList = Streams.readAllLines(segrepStatsResponse.getEntity().getContent());
for (int segmentsIndex=0; segmentsIndex < responseList.size();) {
String[] primaryRow = responseList.get(segmentsIndex++).split(" +");
String shardId = primaryRow[0] + primaryRow[1];
assertTrue(primaryRow[2].equals("p"));
for(int replicaIndex = 1; replicaIndex <= replicaCount; replicaIndex++) {
String[] replicaRow = responseList.get(segmentsIndex).split(" +");
String replicaShardId = replicaRow[0] + replicaRow[1];
// When segment has 0 doc count, not all replica copies posses that segment. Skip to next segment
if (replicaRow[2].equals("p")) {
assertTrue(primaryRow[4].equals("0"));
break;
}
// verify same shard id
assertTrue(replicaShardId.equals(shardId));
// verify replica row
assertTrue(replicaRow[2].equals("r"));
// Verify segment name matches e.g. _0
assertTrue(replicaRow[3].equals(primaryRow[3]));
// Verify doc count matches
assertTrue(replicaRow[4].equals(primaryRow[4]));
segmentsIndex++;
}
}
}
logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap);
logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap);

for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
searchTestIndexRequest.addParameter("filter_path", "hits.total");
searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber));
Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest);
final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total");
final int shardNum = shardNumber;
// Verify replica shard doc count only when available.
if (replicaShardToNodeIDMap.get(shardNum) != null) {
assertBusy(() -> {
Request replicaRequest = new Request("POST", "/" + index + "/_search");
replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
replicaRequest.addParameter("filter_path", "hits.total");
replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum));
Response replicaResponse = client().performRequest(replicaRequest);
int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total");
assertEquals("Doc count mismatch for shard " + shardNum + ". Primary hits " + primaryHits + " Replica hits " + replicaHits, primaryHits, replicaHits);
}, 1, TimeUnit.MINUTES);
}
}
}, 1, TimeUnit.MINUTES);
}

private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException {
Expand All @@ -156,7 +151,7 @@ private void verifySegmentStats(String indexName) throws Exception {
String[] elements = statLine.split(" +");
assertEquals("Replica shard " + elements[0] + "not upto date with primary ", 0, Integer.parseInt(elements[2]));
}
});
}, 1, TimeUnit.MINUTES);
}

public void testIndexing() throws IOException, ParseException {
Expand Down Expand Up @@ -307,7 +302,7 @@ public void testIndexingWithSegRep() throws Exception {
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

waitForSearchableDocs(indexName, shardCount);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount);

if (CLUSTER_TYPE != ClusterType.OLD) {
Expand All @@ -318,17 +313,16 @@ public void testIndexingWithSegRep() throws Exception {
toBeDeleted.addParameter("refresh", "true");
toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}");
client().performRequest(toBeDeleted);
waitForSearchableDocs(indexName, shardCount);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount + 6);

logger.info("--> Delete previously added doc and verify doc count");
Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted");
delete.addParameter("refresh", "true");
client().performRequest(delete);
waitForSearchableDocs(indexName, shardCount);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount + 5);
}
logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
}

public void testAutoIdWithOpTypeCreate() throws IOException {
Expand Down

0 comments on commit 270abeb

Please sign in to comment.