Skip to content

Commit

Permalink
[Backport] [Segment Replication] Added mixed and rolling upgrade bwc …
Browse files Browse the repository at this point in the history
…test (#7537) (#7626)

* [Segment Replication] Added mixed and rolling upgrade bwc test (#7537)

* [Segment Replication] Added mixed cluster bwc test

Signed-off-by: Suraj Singh <[email protected]>

* Remove unnecessary gradle task for segrep

Signed-off-by: Suraj Singh <[email protected]>

* Spotless fix

Signed-off-by: Suraj Singh <[email protected]>

* Spotless fix

Signed-off-by: Suraj Singh <[email protected]>

* [Segment Replication] Rolling upgrade test

Signed-off-by: Suraj Singh <[email protected]>

* PR feedback and cleanup

Signed-off-by: Suraj Singh <[email protected]>

* Verify replica doc count only when it is assigned

Signed-off-by: Suraj Singh <[email protected]>

* Remove wait for yellow cluster

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
(cherry picked from commit 0788feb)

Add version check to skip test for 1.x bwc branches

Signed-off-by: Suraj Singh <[email protected]>

* [Segment Replication] Wait for shard relocation before building node to shard allocation map (#7876)

* [Segment Replication] Wait for shard relocation before building node to shard allocation map

Signed-off-by: Suraj Singh <[email protected]>

* Remove unused imports

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>

Fix imports

Signed-off-by: Suraj Singh <[email protected]>

* [Segment Replication] Update segrep bwc tests to verify replica checkpoints and skip tests for 1.x bwc versions (#8203)

* [Segment Replication] Verify segment replication stats in bwc test

Signed-off-by: Suraj Singh <[email protected]>

* Log cleanup

Signed-off-by: Suraj Singh <[email protected]>

* Spotless check

Signed-off-by: Suraj Singh <[email protected]>

* Add version check to skip test for 1.x bwc branches

Signed-off-by: Suraj Singh <[email protected]>

* Add version check to skip test for 1.x bwc branches for mixed clusters

Signed-off-by: Suraj Singh <[email protected]>

* Add version string in build to identify bwc version

Signed-off-by: Suraj Singh <[email protected]>

* Use correct bwc version string

Signed-off-by: Suraj Singh <[email protected]>

* Address review comments from #7626

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>

[Segment Replication] Use _cat/segments vs index stats + _search to verify doc count

Signed-off-by: Suraj Singh <[email protected]>

Self review

Signed-off-by: Suraj Singh <[email protected]>

remove unused imports

Signed-off-by: Suraj Singh <[email protected]>

Handle 0 doc count segments

Signed-off-by: Suraj Singh <[email protected]>

Add missing import statement

Signed-off-by: Suraj Singh <[email protected]>

* [Segment Replication] Update bwc test to rely on segments for verification (#8267)

* [Segment Replication] Use _cat/segments vs index stats + _search to verify doc count

Signed-off-by: Suraj Singh <[email protected]>

Self review

Signed-off-by: Suraj Singh <[email protected]>

remove unused imports

Signed-off-by: Suraj Singh <[email protected]>

Handle 0 doc count segments

Signed-off-by: Suraj Singh <[email protected]>

* Use 1 minute timeout for assertBusy validations and comments

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>

Add missing import statement

Signed-off-by: Suraj Singh <[email protected]>

* Fix missing import

Signed-off-by: Suraj Singh <[email protected]>

* Logger change

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Jul 13, 2023
1 parent 3d77004 commit 90d1da2
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 2 deletions.
2 changes: 2 additions & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
}

String baseName = "v${bwcVersion}"
String bwcVersionStr = "${bwcVersion}"

/* This project runs the core REST tests against a 4 node cluster where two of
the nodes has a different minor. */
Expand Down Expand Up @@ -88,6 +89,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
}
systemProperty 'tests.upgrade_from_version', bwcVersionStr
systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
onlyIf { project.bwc_tests_enabled }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
package org.opensearch.backwards;

import org.apache.http.HttpHost;
import org.apache.http.ParseException;


import org.apache.http.util.EntityUtils;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.client.Request;
Expand All @@ -45,6 +49,7 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.opensearch.test.rest.yaml.ObjectPath;
Expand All @@ -62,6 +67,9 @@

public class IndexingIT extends OpenSearchRestTestCase {

protected static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));


private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final int id = idStart + i;
Expand Down Expand Up @@ -97,6 +105,111 @@ private int indexDocWithConcurrentUpdates(String index, final int docId, int nUp
return nUpdates + 1;
}

private void printClusterRouting() throws IOException, ParseException {
Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty");
String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim();
logger.info("cluster nodes: {}", clusterState);
}

/**
* This test verifies that segment replication does not break when primary shards are on lower OS version. It does this
* by verifying replica shards contains same number of documents as primary's.
*
* @throws Exception
*/
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Update allocation settings so that primaries gets allocated only on nodes running on older version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureNoInitializingShards(); // wait for all other shard activity to finish

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("Remove allocation include settings so that shards can be allocated on current version nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
// Add replicas so that it can be assigned on higher OS version nodes.
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));

printClusterRouting();
ensureGreen(index);

// Index docs
indexDocs(index, 0, docCount);

// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));

// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}


/**
* This test creates a cluster with primary on higher version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider};
* replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where
* primary shard containing nodes are running on higher OS version while replicas are unassigned.
*
* @throws Exception
*/
public void testIndexingWithReplicaOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current/higher version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.exclude._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureNoInitializingShards(); // wait for all other shard activity to finish
printClusterRouting();

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("allowing replica shards assignment on bwc nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name"));
// Add replicas so that it can be assigned on lower OS version nodes, but it doesn't work as called out in test overview
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));
printClusterRouting();

// Index docs
indexDocs(index, 0, docCount);

// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));

// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}

public void testIndexVersionPropagation() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,35 @@
package org.opensearch.upgrades;

import org.apache.http.util.EntityUtils;
import org.apache.http.ParseException;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Booleans;
import org.opensearch.common.io.Streams;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;


/**
* Basic test that indexed documents survive the rolling restart. See
* {@link RecoveryIT} for much more in depth testing of the mechanism
Expand All @@ -61,7 +72,95 @@
*/
public class IndexingIT extends AbstractRollingTestCase {

public void testIndexing() throws IOException {
private void printClusterNodes() throws IOException, ParseException, URISyntaxException {
Request clusterStateRequest = new Request("GET", "_nodes");
Response response = client().performRequest(clusterStateRequest);

ObjectPath objectPath = ObjectPath.createFromResponse(response);
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
for (String id : nodesAsMap.keySet()) {
logger.info("--> {} {} {}",
id,
objectPath.evaluate("nodes." + id + ".name"),
Version.fromString(objectPath.evaluate("nodes." + id + ".version")));
}
response = client().performRequest(new Request("GET", "_cluster/state"));
String cm = ObjectPath.createFromResponse(response).evaluate("master_node");
logger.info("--> Cluster manager {}", cm);
}

// Verifies that for each shard copy holds same document count across all containing nodes.
private void waitForSearchableDocs(String index, int shardCount, int replicaCount) throws Exception {
assertTrue(shardCount > 0);
assertTrue(replicaCount > 0);
waitForClusterHealthWithNoShardMigration(index, "green");
logger.info("--> _cat/shards before search \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));

// Verify segment replication stats
verifySegmentStats(index);

// Verify segment store
assertBusy(() -> {
/**
* Use default tabular output and sort response based on shard,segment,primaryOrReplica columns to allow line by
* line parsing where records related to a segment (e.g. _0) are chunked together with first record belonging
* to primary while remaining *replicaCount* records belongs to replica copies
* */
Request segrepStatsRequest = new Request("GET", "/_cat/segments/" + index + "?s=shard,segment,primaryOrReplica");
segrepStatsRequest.addParameter("h", "index,shard,primaryOrReplica,segment,docs.count");
Response segrepStatsResponse = client().performRequest(segrepStatsRequest);
logger.info("--> _cat/segments response\n {}", EntityUtils.toString(client().performRequest(segrepStatsRequest).getEntity()));
List<String> responseList = Streams.readAllLines(segrepStatsResponse.getEntity().getContent());
for (int segmentsIndex=0; segmentsIndex < responseList.size();) {
String[] primaryRow = responseList.get(segmentsIndex++).split(" +");
String shardId = primaryRow[0] + primaryRow[1];
assertTrue(primaryRow[2].equals("p"));
for(int replicaIndex = 1; replicaIndex <= replicaCount; replicaIndex++) {
String[] replicaRow = responseList.get(segmentsIndex).split(" +");
String replicaShardId = replicaRow[0] + replicaRow[1];
// When segment has 0 doc count, not all replica copies posses that segment. Skip to next segment
if (replicaRow[2].equals("p")) {
assertTrue(primaryRow[4].equals("0"));
break;
}
// verify same shard id
assertTrue(replicaShardId.equals(shardId));
// verify replica row
assertTrue(replicaRow[2].equals("r"));
// Verify segment name matches e.g. _0
assertTrue(replicaRow[3].equals(primaryRow[3]));
// Verify doc count matches
assertTrue(replicaRow[4].equals(primaryRow[4]));
segmentsIndex++;
}
}
}, 1, TimeUnit.MINUTES);
}

private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException {
Request waitForStatus = new Request("GET", "/_cluster/health/" + indexName);
waitForStatus.addParameter("wait_for_status", status);
// wait for long enough that we give delayed unassigned shards to stop being delayed
waitForStatus.addParameter("timeout", "70s");
waitForStatus.addParameter("level", "shards");
waitForStatus.addParameter("wait_for_no_initializing_shards", "true");
waitForStatus.addParameter("wait_for_no_relocating_shards", "true");
client().performRequest(waitForStatus);
}

private void verifySegmentStats(String indexName) throws Exception {
assertBusy(() -> {
Request segrepStatsRequest = new Request("GET", "/_cat/segment_replication/" + indexName);
segrepStatsRequest.addParameter("h", "shardId,target_node,checkpoints_behind");
Response segrepStatsResponse = client().performRequest(segrepStatsRequest);
for (String statLine : Streams.readAllLines(segrepStatsResponse.getEntity().getContent())) {
String[] elements = statLine.split(" +");
assertEquals("Replica shard " + elements[0] + "not upto date with primary ", 0, Integer.parseInt(elements[2]));
}
}, 1, TimeUnit.MINUTES);
}

public void testIndexing() throws IOException, ParseException {
switch (CLUSTER_TYPE) {
case OLD:
break;
Expand Down Expand Up @@ -147,6 +246,91 @@ public void testIndexing() throws IOException {
}
}


/**
* This test verifies that during rolling upgrades the segment replication does not break when replica shards can
* be running on older codec versions.
*
* @throws Exception
*/
public void testIndexingWithSegRep() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
final String indexName = "test-index-segrep";
final int shardCount = 3;
final int replicaCount = 2;
logger.info("--> Case {}", CLUSTER_TYPE);
printClusterNodes();
logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(
EngineConfig.INDEX_CODEC_SETTING.getKey(),
randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC, CodecService.LUCENE_DEFAULT_CODEC)
)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms");
createIndex(indexName, settings.build());
waitForClusterHealthWithNoShardMigration(indexName, "green");
bulk(indexName, "_OLD", 5);
break;
case MIXED:
waitForClusterHealthWithNoShardMigration(indexName, "yellow");
break;
case UPGRADED:
waitForClusterHealthWithNoShardMigration(indexName, "green");
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

int expectedCount;
switch (CLUSTER_TYPE) {
case OLD:
expectedCount = 5;
break;
case MIXED:
if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) {
expectedCount = 5;
} else {
expectedCount = 10;
}
break;
case UPGRADED:
expectedCount = 15;
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount);

if (CLUSTER_TYPE != ClusterType.OLD) {
logger.info("--> Bulk index 5 documents");
bulk(indexName, "_" + CLUSTER_TYPE, 5);
logger.info("--> Index one doc (to be deleted next) and verify doc count");
Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted");
toBeDeleted.addParameter("refresh", "true");
toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}");
client().performRequest(toBeDeleted);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount + 6);

logger.info("--> Delete previously added doc and verify doc count");
Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted");
delete.addParameter("refresh", "true");
client().performRequest(delete);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount + 5);
}
}

public void testAutoIdWithOpTypeCreate() throws IOException {
final String indexName = "auto_id_and_op_type_create_index";
StringBuilder b = new StringBuilder();
Expand Down

0 comments on commit 90d1da2

Please sign in to comment.