diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index ed4bf11041c88..4d965ce4ff8fe 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -31,22 +31,34 @@ package org.opensearch.upgrades; +import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Booleans; import org.opensearch.common.settings.Settings; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.action.document.RestBulkAction; +import org.opensearch.test.rest.yaml.ObjectPath; import java.io.IOException; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; @@ -62,6 +74,81 @@ */ public class IndexingIT extends AbstractRollingTestCase { + private void printClusterNodes() throws IOException, ParseException, URISyntaxException { + Request clusterStateRequest = new Request("GET", "_nodes"); + Response response = client().performRequest(clusterStateRequest); + + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + for (String id : nodesAsMap.keySet()) { + logger.info("--> {} {} {}", + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version"))); + } + response = client().performRequest(new Request("GET", "_cluster/state")); + String cm = ObjectPath.createFromResponse(response).evaluate("master_node"); + logger.info("--> Cluster manager {}", cm); + } + + // Verifies that for each shard copy holds same document count across all containing nodes. + private void waitForSearchableDocs(String index, int shardCount) throws Exception { + Map primaryShardToNodeIDMap = new HashMap<>(); + Map replicaShardToNodeIDMap = new HashMap<>(); + logger.info("--> _cat/shards \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + + Request request = new Request("GET", index + "/_stats"); + request.addParameter("level", "shards"); + Response response = client().performRequest(request); + for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + if (primary) { + primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + } else { + replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + } + } + } + logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap); + logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap); + + for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { + logger.info("--> Verify doc count for shard number {}", shardNumber); + Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search"); + searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + searchTestIndexRequest.addParameter("filter_path", "hits.total"); + searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber)); + Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest); + final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total"); + logger.info("--> primaryHits {}", primaryHits); + final int shardNum = shardNumber; + assertBusy(() -> { + Request replicaRequest = new Request("POST", "/" + index + "/_search"); + replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + replicaRequest.addParameter("filter_path", "hits.total"); + replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); + Response replicaResponse = client().performRequest(replicaRequest); + int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); + logger.info("--> ReplicaHits {}", replicaHits); + assertEquals(primaryHits, replicaHits); + }, 1, TimeUnit.MINUTES); + } + } + + private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException { + Request waitForStatus = new Request("GET", "/_cluster/health/" + indexName); + waitForStatus.addParameter("wait_for_status", status); + // wait for long enough that we give delayed unassigned shards to stop being delayed + waitForStatus.addParameter("timeout", "70s"); + waitForStatus.addParameter("level", "shards"); + waitForStatus.addParameter("wait_for_no_initializing_shards", "true"); + waitForStatus.addParameter("wait_for_no_relocating_shards", "true"); + client().performRequest(waitForStatus); + } + public void testIndexing() throws IOException, ParseException { switch (CLUSTER_TYPE) { case OLD: @@ -148,6 +235,83 @@ public void testIndexing() throws IOException, ParseException { } } + + /** + * This test verifies that during rolling upgrades the segment replication does not break when replica shards can + * be running on older codec versions. + * + * @throws Exception + */ + public void testIndexingWithSegRep() throws Exception { + final String indexName = "test-index-segrep"; + final int shardCount = 3; + final int replicaCount = 1; + logger.info("--> Case {}", CLUSTER_TYPE); + printClusterNodes(); + logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); + createIndex(indexName, settings.build()); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + bulk(indexName, "_OLD", 5); + break; + case MIXED: + waitForClusterHealthWithNoShardMigration(indexName, "yellow"); + break; + case UPGRADED: + waitForClusterHealthWithNoShardMigration(indexName, "green"); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + int expectedCount; + switch (CLUSTER_TYPE) { + case OLD: + expectedCount = 5; + break; + case MIXED: + if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) { + expectedCount = 5; + } else { + expectedCount = 10; + } + break; + case UPGRADED: + expectedCount = 15; + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount); + + if (CLUSTER_TYPE != ClusterType.OLD) { + logger.info("--> Index one doc (to be deleted next) and verify doc count"); + bulk(indexName, "_" + CLUSTER_TYPE, 5); + Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted"); + toBeDeleted.addParameter("refresh", "true"); + toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); + client().performRequest(toBeDeleted); + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount + 6); + + logger.info("--> Delete previously added doc and verify doc count"); + Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted"); + delete.addParameter("refresh", "true"); + client().performRequest(delete); + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount + 5); + } + logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + } + public void testAutoIdWithOpTypeCreate() throws IOException { final String indexName = "auto_id_and_op_type_create_index"; StringBuilder b = new StringBuilder();