Skip to content

Commit

Permalink
[Segment Replication] Added mixed cluster bwc test
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed May 11, 2023
1 parent 20ad1d8 commit 4fb8f42
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ String bwc_tests_disabled_issue = ""

/* there's no existing MacOS release, therefore disable bcw tests */
if (Os.isFamily(Os.FAMILY_MAC)) {
bwc_tests_enabled = false
// bwc_tests_enabled = false
bwc_tests_disabled_issue = "https://github.com/opensearch-project/OpenSearch/issues/4173"
}

Expand Down
26 changes: 26 additions & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
}

String baseName = "v${bwcVersion}"
String segRepCluster = "${baseName}segrep"

/* This project runs the core REST tests against a 4 node cluster where two of
the nodes has a different minor. */
Expand All @@ -65,6 +66,12 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {

setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
}

"$segRepCluster" {
versions = [bwcVersion.toString(), project.version]
numberOfNodes = 2
setting 'path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}"
}
}

tasks.register("${baseName}#mixedClusterTest", StandaloneRestIntegTestTask) {
Expand All @@ -89,7 +96,26 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
onlyIf { project.bwc_tests_enabled }
}

tasks.register("${baseName}#segrepMixedClusterTest", StandaloneRestIntegTestTask) {
useCluster testClusters."${segRepCluster}"
doFirst {
delete("${buildDir}/cluster/shared/repo/${segRepCluster}")
// Getting the endpoints causes a wait for the cluster
println "Test cluster endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}"
println "Upgrading one node to create a mixed cluster"
testClusters."${segRepCluster}".nextNodeToNextVersion()
// Getting the endpoints causes a wait for the cluster
println "Upgrade complete, endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}"
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${segRepCluster}".getName()}")
}
systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}"
systemProperty 'tests.segrep_enabled', true
onlyIf { project.bwc_tests_enabled }
}

tasks.register(bwcTaskName(bwcVersion)) {
dependsOn "${baseName}#mixedClusterTest"
dependsOn "${baseName}#segrepMixedClusterTest"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
package org.opensearch.backwards;

import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.client.Request;
Expand All @@ -45,6 +47,7 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.opensearch.test.rest.yaml.ObjectPath;
Expand All @@ -63,6 +66,9 @@

public class IndexingIT extends OpenSearchRestTestCase {

protected static final Boolean SEGREP_ENABLED = Boolean.parseBoolean(System.getProperty("tests.segrep_enabled"));


private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final int id = idStart + i;
Expand Down Expand Up @@ -98,10 +104,115 @@ private int indexDocWithConcurrentUpdates(String index, final int docId, int nUp
return nUpdates + 1;
}

public void printClusterRouting() throws IOException, ParseException {
Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty");
String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim();
logger.info("cluster nodes: {}", clusterState);
}

public void printIndexSettings(String index) throws IOException, ParseException {
Request indexSettings = new Request("GET", index + "/_settings?pretty");
String idxSettings = EntityUtils.toString(client().performRequest(indexSettings).getEntity()).trim();
logger.info("idxSettings : {}", idxSettings);
}

/**
* This test verifies that segment replication does not break when primary shards are on higher version.
*
* @throws Exception
*/
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
if (SEGREP_ENABLED == false) return;
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
logger.info("--> bwc nodes {}", bwcNamesList);
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureYellow(index);
printClusterRouting();

printIndexSettings(index);

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("allowing replica shards assignment on bwc nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
printClusterRouting();
printIndexSettings(index);
ensureGreen(index);

// Index docs
indexDocs(index, 0, docCount);

// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));

// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}


/**
* This test creates a cluster with primary on older version. But, replicas
*
* @throws Exception
*/
public void testIndexingWithReplicaOnBwcNodes() throws Exception {
if (SEGREP_ENABLED == false) return;
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
logger.info("--> bwc nodes {}", bwcNamesList);
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.exclude._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureYellow(index);
printClusterRouting();
printIndexSettings(index);

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("allowing replica shards assignment on bwc nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name"));
printClusterRouting();

// Index docs
indexDocs(index, 0, docCount);

// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));

// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}

public void testIndexVersionPropagation() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered: {}", nodes.toString());
logger.info("--> Nodes {}", nodes);
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
Settings.Builder settings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,20 @@ public static void ensureGreen(String index) throws IOException {
});
}

/**
* Checks that the specific index is yellow.
*
* @param index index to test for
**/
public static void ensureYellow(String index) throws IOException {
ensureHealth(index, (request) -> {
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
});
}

protected static void ensureHealth(Consumer<Request> requestConsumer) throws IOException {
ensureHealth("", requestConsumer);
}
Expand Down

0 comments on commit 4fb8f42

Please sign in to comment.