From 4fb8f4263f39b81dda20ac7470caf8877c5505bb Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 11 May 2023 14:39:52 -0700 Subject: [PATCH] [Segment Replication] Added mixed cluster bwc test Signed-off-by: Suraj Singh --- build.gradle | 2 +- qa/mixed-cluster/build.gradle | 26 ++++ .../org/opensearch/backwards/IndexingIT.java | 111 ++++++++++++++++++ .../test/rest/OpenSearchRestTestCase.java | 14 +++ 4 files changed, 152 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 220f6f3daf5ac..f42c5ad910ee1 100644 --- a/build.gradle +++ b/build.gradle @@ -231,7 +231,7 @@ String bwc_tests_disabled_issue = "" /* there's no existing MacOS release, therefore disable bcw tests */ if (Os.isFamily(Os.FAMILY_MAC)) { - bwc_tests_enabled = false +// bwc_tests_enabled = false bwc_tests_disabled_issue = "https://github.com/opensearch-project/OpenSearch/issues/4173" } diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 90aeb8faadf80..4f898f79bc898 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -55,6 +55,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { } String baseName = "v${bwcVersion}" + String segRepCluster = "${baseName}segrep" /* This project runs the core REST tests against a 4 node cluster where two of the nodes has a different minor. */ @@ -65,6 +66,12 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" } + + "$segRepCluster" { + versions = [bwcVersion.toString(), project.version] + numberOfNodes = 2 + setting 'path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}" + } } tasks.register("${baseName}#mixedClusterTest", StandaloneRestIntegTestTask) { @@ -89,7 +96,26 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { onlyIf { project.bwc_tests_enabled } } + tasks.register("${baseName}#segrepMixedClusterTest", StandaloneRestIntegTestTask) { + useCluster testClusters."${segRepCluster}" + doFirst { + delete("${buildDir}/cluster/shared/repo/${segRepCluster}") + // Getting the endpoints causes a wait for the cluster + println "Test cluster endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}" + println "Upgrading one node to create a mixed cluster" + testClusters."${segRepCluster}".nextNodeToNextVersion() + // Getting the endpoints causes a wait for the cluster + println "Upgrade complete, endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}" + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${segRepCluster}".getName()}") + } + systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}" + systemProperty 'tests.segrep_enabled', true + onlyIf { project.bwc_tests_enabled } + } + tasks.register(bwcTaskName(bwcVersion)) { dependsOn "${baseName}#mixedClusterTest" + dependsOn "${baseName}#segrepMixedClusterTest" } } diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 2e36a352c75dd..a07c4f750b13f 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -32,6 +32,8 @@ package org.opensearch.backwards; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.client.Request; @@ -45,6 +47,7 @@ import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.common.xcontent.support.XContentMapValues; import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.RestStatus; import org.opensearch.test.rest.OpenSearchRestTestCase; import org.opensearch.test.rest.yaml.ObjectPath; @@ -63,6 +66,9 @@ public class IndexingIT extends OpenSearchRestTestCase { + protected static final Boolean SEGREP_ENABLED = Boolean.parseBoolean(System.getProperty("tests.segrep_enabled")); + + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; @@ -98,10 +104,115 @@ private int indexDocWithConcurrentUpdates(String index, final int docId, int nUp return nUpdates + 1; } + public void printClusterRouting() throws IOException, ParseException { + Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty"); + String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim(); + logger.info("cluster nodes: {}", clusterState); + } + + public void printIndexSettings(String index) throws IOException, ParseException { + Request indexSettings = new Request("GET", index + "/_settings?pretty"); + String idxSettings = EntityUtils.toString(client().performRequest(indexSettings).getEntity()).trim(); + logger.info("idxSettings : {}", idxSettings); + } + + /** + * This test verifies that segment replication does not break when primary shards are on higher version. + * + * @throws Exception + */ + public void testIndexingWithPrimaryOnBwcNodes() throws Exception { + if (SEGREP_ENABLED == false) return; + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered:\n {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + logger.info("--> bwc nodes {}", bwcNamesList); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + // Exclude bwc nodes from allocation so that primaries gets allocated on current version + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.routing.allocation.include._name", bwcNames); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureYellow(index); + printClusterRouting(); + + printIndexSettings(index); + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + logger.info("allowing replica shards assignment on bwc nodes"); + updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name")); + printClusterRouting(); + printIndexSettings(index); + ensureGreen(index); + + // Index docs + indexDocs(index, 0, docCount); + + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + } + } + + + /** + * This test creates a cluster with primary on older version. But, replicas + * + * @throws Exception + */ + public void testIndexingWithReplicaOnBwcNodes() throws Exception { + if (SEGREP_ENABLED == false) return; + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered:\n {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + logger.info("--> bwc nodes {}", bwcNamesList); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + // Exclude bwc nodes from allocation so that primaries gets allocated on current version + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.routing.allocation.exclude._name", bwcNames); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureYellow(index); + printClusterRouting(); + printIndexSettings(index); + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + logger.info("allowing replica shards assignment on bwc nodes"); + updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name")); + printClusterRouting(); + + // Index docs + indexDocs(index, 0, docCount); + + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + } + } + public void testIndexVersionPropagation() throws Exception { Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); logger.info("cluster discovered: {}", nodes.toString()); + logger.info("--> Nodes {}", nodes); final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); Settings.Builder settings = Settings.builder() diff --git a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java index a5d2e6793f397..11df944f1644a 100644 --- a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java @@ -918,6 +918,20 @@ public static void ensureGreen(String index) throws IOException { }); } + /** + * Checks that the specific index is yellow. + * + * @param index index to test for + **/ + public static void ensureYellow(String index) throws IOException { + ensureHealth(index, (request) -> { + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + }); + } + protected static void ensureHealth(Consumer requestConsumer) throws IOException { ensureHealth("", requestConsumer); }