diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 173aa9f6557d2..93c0bc96a5183 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -48,16 +48,12 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.either; /** * Basic test that indexed documents survive the rolling restart. See @@ -88,52 +84,51 @@ private void printClusterNodes() throws IOException, ParseException, URISyntaxEx } // Verifies that for each shard copy holds same document count across all containing nodes. - private void waitForSearchableDocs(String index, int shardCount) throws Exception { - Map primaryShardToNodeIDMap = new HashMap<>(); - Map replicaShardToNodeIDMap = new HashMap<>(); + private void waitForSearchableDocs(String index, int shardCount, int replicaCount) throws Exception { + assertTrue(shardCount > 0); + assertTrue(replicaCount > 0); waitForClusterHealthWithNoShardMigration(index, "green"); logger.info("--> _cat/shards before search \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + // Verify segment replication stats verifySegmentStats(index); - Request request = new Request("GET", index + "/_stats"); - request.addParameter("level", "shards"); - Response response = client().performRequest(request); - for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { - List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber); - for (Object shard : shardStats) { - final String nodeId = ObjectPath.evaluate(shard, "routing.node"); - final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); - if (primary) { - primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); - } else { - replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + + // Verify segment store + assertBusy(() -> { + /** + * Use default tabular output and sort response based on shard,segment,primaryOrReplica columns to allow line by + * line parsing where records related to a segment (e.g. _0) are chunked together with first record belonging + * to primary while remaining *replicaCount* records belongs to replica copies + * */ + Request segrepStatsRequest = new Request("GET", "/_cat/segments/" + index + "?s=shard,segment,primaryOrReplica"); + segrepStatsRequest.addParameter("h", "index,shard,primaryOrReplica,segment,docs.count"); + Response segrepStatsResponse = client().performRequest(segrepStatsRequest); + logger.info("--> _cat/segments response\n {}", EntityUtils.toString(segrepStatsResponse.getEntity())); + List responseList = Streams.readAllLines(segrepStatsResponse.getEntity().getContent()); + for (int segmentsIndex=0; segmentsIndex < responseList.size();) { + String[] primaryRow = responseList.get(segmentsIndex++).split(" +"); + String shardId = primaryRow[0] + primaryRow[1]; + assertTrue(primaryRow[2].equals("p")); + for(int replicaIndex = 1; replicaIndex <= replicaCount; replicaIndex++) { + String[] replicaRow = responseList.get(segmentsIndex).split(" +"); + String replicaShardId = replicaRow[0] + replicaRow[1]; + // When segment has 0 doc count, not all replica copies posses that segment. Skip to next segment + if (replicaRow[2].equals("p")) { + assertTrue(primaryRow[4].equals("0")); + break; + } + // verify same shard id + assertTrue(replicaShardId.equals(shardId)); + // verify replica row + assertTrue(replicaRow[2].equals("r")); + // Verify segment name matches e.g. _0 + assertTrue(replicaRow[3].equals(primaryRow[3])); + // Verify doc count matches + assertTrue(replicaRow[4].equals(primaryRow[4])); + segmentsIndex++; } } - } - logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap); - logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap); - - for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { - Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search"); - searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); - searchTestIndexRequest.addParameter("filter_path", "hits.total"); - searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber)); - Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest); - final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total"); - final int shardNum = shardNumber; - // Verify replica shard doc count only when available. - if (replicaShardToNodeIDMap.get(shardNum) != null) { - assertBusy(() -> { - Request replicaRequest = new Request("POST", "/" + index + "/_search"); - replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); - replicaRequest.addParameter("filter_path", "hits.total"); - replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); - Response replicaResponse = client().performRequest(replicaRequest); - int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); - assertEquals("Doc count mismatch for shard " + shardNum + ". Primary hits " + primaryHits + " Replica hits " + replicaHits, primaryHits, replicaHits); - }, 1, TimeUnit.MINUTES); - } - } + }, 1, TimeUnit.MINUTES); } private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException { @@ -156,7 +151,7 @@ private void verifySegmentStats(String indexName) throws Exception { String[] elements = statLine.split(" +"); assertEquals("Replica shard " + elements[0] + "not upto date with primary ", 0, Integer.parseInt(elements[2])); } - }); + }, 1, TimeUnit.MINUTES); } public void testIndexing() throws IOException, ParseException { @@ -307,7 +302,7 @@ public void testIndexingWithSegRep() throws Exception { throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); } - waitForSearchableDocs(indexName, shardCount); + waitForSearchableDocs(indexName, shardCount, replicaCount); assertCount(indexName, expectedCount); if (CLUSTER_TYPE != ClusterType.OLD) { @@ -318,17 +313,16 @@ public void testIndexingWithSegRep() throws Exception { toBeDeleted.addParameter("refresh", "true"); toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); client().performRequest(toBeDeleted); - waitForSearchableDocs(indexName, shardCount); + waitForSearchableDocs(indexName, shardCount, replicaCount); assertCount(indexName, expectedCount + 6); logger.info("--> Delete previously added doc and verify doc count"); Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted"); delete.addParameter("refresh", "true"); client().performRequest(delete); - waitForSearchableDocs(indexName, shardCount); + waitForSearchableDocs(indexName, shardCount, replicaCount); assertCount(indexName, expectedCount + 5); } - logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); } public void testAutoIdWithOpTypeCreate() throws IOException {