diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index e8fce8fd31453..7c869ccb89a2a 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -32,6 +32,10 @@ package org.opensearch.backwards; import org.apache.http.HttpHost; +import org.apache.http.ParseException; + + +import org.apache.http.util.EntityUtils; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.client.Request; @@ -45,6 +49,7 @@ import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.common.xcontent.support.XContentMapValues; import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.RestStatus; import org.opensearch.test.rest.OpenSearchRestTestCase; import org.opensearch.test.rest.yaml.ObjectPath; @@ -97,6 +102,103 @@ private int indexDocWithConcurrentUpdates(String index, final int docId, int nUp return nUpdates + 1; } + private void printClusterRouting() throws IOException, ParseException { + Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty"); + String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim(); + logger.info("cluster nodes: {}", clusterState); + } + + /** + * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this + * by verifying replica shards contains same number of documents as primary's. + * + * @throws Exception + */ + public void testIndexingWithPrimaryOnBwcNodes() throws Exception { + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered:\n {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + // Exclude bwc nodes from allocation so that primaries gets allocated on current version + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.routing.allocation.include._name", bwcNames); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureNoInitializingShards(); // wait for all other shard activity to finish + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + logger.info("allowing replica shards assignment on bwc nodes"); + updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name")); + // Add replicas so that it can be assigned on higher OS version nodes. + updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2)); + + printClusterRouting(); + ensureGreen(index); + + // Index docs + indexDocs(index, 0, docCount); + + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + } + } + + + /** + * This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider}; + * replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where + * primary shard containing nodes are running on higher OS version while replicas are unassigned. + * + * @throws Exception + */ + public void testIndexingWithReplicaOnBwcNodes() throws Exception { + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered:\n {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + // Exclude bwc nodes from allocation so that primaries gets allocated on current/higher version + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.routing.allocation.exclude._name", bwcNames); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureNoInitializingShards(); // wait for all other shard activity to finish + printClusterRouting(); + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + logger.info("allowing replica shards assignment on bwc nodes"); + updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name")); + // Add replicas so that it can be assigned on lower OS version nodes, but it doesn't work as called out in test overview + updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2)); + printClusterRouting(); + + // Index docs + indexDocs(index, 0, docCount); + + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + } + } + public void testIndexVersionPropagation() throws Exception { Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index f34e5f7bc121a..af14b28ae7d7a 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -32,6 +32,8 @@ package org.opensearch.upgrades; import org.apache.http.util.EntityUtils; +import org.apache.http.ParseException; + import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.client.Request; @@ -40,12 +42,18 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Booleans; import org.opensearch.common.settings.Settings; -import org.opensearch.rest.action.document.RestBulkAction; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.rest.yaml.ObjectPath; import java.io.IOException; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; @@ -61,7 +69,85 @@ */ public class IndexingIT extends AbstractRollingTestCase { - public void testIndexing() throws IOException { + private void printClusterNodes() throws IOException, ParseException, URISyntaxException { + Request clusterStateRequest = new Request("GET", "_nodes"); + Response response = client().performRequest(clusterStateRequest); + + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + for (String id : nodesAsMap.keySet()) { + logger.info("--> {} {} {}", + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version"))); + } + response = client().performRequest(new Request("GET", "_cluster/state")); + String cm = ObjectPath.createFromResponse(response).evaluate("master_node"); + logger.info("--> Cluster manager {}", cm); + } + + // Verifies that for each shard copy holds same document count across all containing nodes. + private void waitForSearchableDocs(String index, int shardCount) throws Exception { + Map primaryShardToNodeIDMap = new HashMap<>(); + Map replicaShardToNodeIDMap = new HashMap<>(); + logger.info("--> _cat/shards \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + + Request request = new Request("GET", index + "/_stats"); + request.addParameter("level", "shards"); + Response response = client().performRequest(request); + for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + if (primary) { + primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + } else { + replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + } + } + } + logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap); + logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap); + + for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { + logger.info("--> Verify doc count for shard number {}", shardNumber); + Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search"); + searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + searchTestIndexRequest.addParameter("filter_path", "hits.total"); + searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber)); + Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest); + final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total"); + logger.info("--> primaryHits {}", primaryHits); + final int shardNum = shardNumber; + // Verify replica shard doc count only when available. + if (replicaShardToNodeIDMap.get(shardNum) != null) { + assertBusy(() -> { + Request replicaRequest = new Request("POST", "/" + index + "/_search"); + replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + replicaRequest.addParameter("filter_path", "hits.total"); + replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); + Response replicaResponse = client().performRequest(replicaRequest); + int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); + logger.info("--> ReplicaHits {}", replicaHits); + assertEquals(primaryHits, replicaHits); + }, 1, TimeUnit.MINUTES); + } + } + } + + private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException { + Request waitForStatus = new Request("GET", "/_cluster/health/" + indexName); + waitForStatus.addParameter("wait_for_status", status); + // wait for long enough that we give delayed unassigned shards to stop being delayed + waitForStatus.addParameter("timeout", "70s"); + waitForStatus.addParameter("level", "shards"); + waitForStatus.addParameter("wait_for_no_initializing_shards", "true"); + waitForStatus.addParameter("wait_for_no_relocating_shards", "true"); + client().performRequest(waitForStatus); + } + + public void testIndexing() throws IOException, ParseException { switch (CLUSTER_TYPE) { case OLD: break; @@ -147,6 +233,88 @@ public void testIndexing() throws IOException { } } + + /** + * This test verifies that during rolling upgrades the segment replication does not break when replica shards can + * be running on older codec versions. + * + * @throws Exception + */ + public void testIndexingWithSegRep() throws Exception { + if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { + logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); + return; + } + final String indexName = "test-index-segrep"; + final int shardCount = 3; + final int replicaCount = 1; + logger.info("--> Case {}", CLUSTER_TYPE); + printClusterNodes(); + logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); + createIndex(indexName, settings.build()); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + bulk(indexName, "_OLD", 5); + break; + case MIXED: + waitForClusterHealthWithNoShardMigration(indexName, "yellow"); + break; + case UPGRADED: + waitForClusterHealthWithNoShardMigration(indexName, "green"); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + int expectedCount; + switch (CLUSTER_TYPE) { + case OLD: + expectedCount = 5; + break; + case MIXED: + if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) { + expectedCount = 5; + } else { + expectedCount = 10; + } + break; + case UPGRADED: + expectedCount = 15; + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount); + + if (CLUSTER_TYPE != ClusterType.OLD) { + logger.info("--> Bulk index 5 documents"); + bulk(indexName, "_" + CLUSTER_TYPE, 5); + logger.info("--> Index one doc (to be deleted next) and verify doc count"); + Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted"); + toBeDeleted.addParameter("refresh", "true"); + toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); + client().performRequest(toBeDeleted); + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount + 6); + + logger.info("--> Delete previously added doc and verify doc count"); + Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted"); + delete.addParameter("refresh", "true"); + client().performRequest(delete); + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount + 5); + } + logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + } + public void testAutoIdWithOpTypeCreate() throws IOException { final String indexName = "auto_id_and_op_type_create_index"; StringBuilder b = new StringBuilder();