Skip to content

Commit

Permalink
[Segment Replication] Rolling upgrade test
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed May 15, 2023
1 parent a37cdc3 commit ec67769
Showing 1 changed file with 164 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,34 @@

package org.opensearch.upgrades;

import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
Expand All @@ -62,6 +74,81 @@
*/
public class IndexingIT extends AbstractRollingTestCase {

private void printClusterNodes() throws IOException, ParseException, URISyntaxException {
Request clusterStateRequest = new Request("GET", "_nodes");
Response response = client().performRequest(clusterStateRequest);

ObjectPath objectPath = ObjectPath.createFromResponse(response);
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
for (String id : nodesAsMap.keySet()) {
logger.info("--> {} {} {}",
id,
objectPath.evaluate("nodes." + id + ".name"),
Version.fromString(objectPath.evaluate("nodes." + id + ".version")));
}
response = client().performRequest(new Request("GET", "_cluster/state"));
String cm = ObjectPath.createFromResponse(response).evaluate("master_node");
logger.info("--> Cluster manager {}", cm);
}

// Verifies that for each shard copy holds same document count across all containing nodes.
private void waitForSearchableDocs(String index, int shardCount) throws Exception {
Map<Integer,String> primaryShardToNodeIDMap = new HashMap<>();
Map<Integer,String> replicaShardToNodeIDMap = new HashMap<>();
logger.info("--> _cat/shards \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));

Request request = new Request("GET", index + "/_stats");
request.addParameter("level", "shards");
Response response = client().performRequest(request);
for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber);
for (Object shard : shardStats) {
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
final Boolean primary = ObjectPath.evaluate(shard, "routing.primary");
if (primary) {
primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);
} else {
replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);
}
}
}
logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap);
logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap);

for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
logger.info("--> Verify doc count for shard number {}", shardNumber);
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
searchTestIndexRequest.addParameter("filter_path", "hits.total");
searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber));
Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest);
final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total");
logger.info("--> primaryHits {}", primaryHits);
final int shardNum = shardNumber;
assertBusy(() -> {
Request replicaRequest = new Request("POST", "/" + index + "/_search");
replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
replicaRequest.addParameter("filter_path", "hits.total");
replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum));
Response replicaResponse = client().performRequest(replicaRequest);
int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total");
logger.info("--> ReplicaHits {}", replicaHits);
assertEquals(primaryHits, replicaHits);
}, 1, TimeUnit.MINUTES);
}
}

private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException {
Request waitForStatus = new Request("GET", "/_cluster/health/" + indexName);
waitForStatus.addParameter("wait_for_status", status);
// wait for long enough that we give delayed unassigned shards to stop being delayed
waitForStatus.addParameter("timeout", "70s");
waitForStatus.addParameter("level", "shards");
waitForStatus.addParameter("wait_for_no_initializing_shards", "true");
waitForStatus.addParameter("wait_for_no_relocating_shards", "true");
client().performRequest(waitForStatus);
}

public void testIndexing() throws IOException, ParseException {
switch (CLUSTER_TYPE) {
case OLD:
Expand Down Expand Up @@ -148,6 +235,83 @@ public void testIndexing() throws IOException, ParseException {
}
}


/**
* This test verifies that during rolling upgrades the segment replication does not break when replica shards can
* be running on older codec versions.
*
* @throws Exception
*/
public void testIndexingWithSegRep() throws Exception {
final String indexName = "test-index-segrep";
final int shardCount = 3;
final int replicaCount = 1;
logger.info("--> Case {}", CLUSTER_TYPE);
printClusterNodes();
logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms");
createIndex(indexName, settings.build());
waitForClusterHealthWithNoShardMigration(indexName, "green");
bulk(indexName, "_OLD", 5);
break;
case MIXED:
waitForClusterHealthWithNoShardMigration(indexName, "yellow");
break;
case UPGRADED:
waitForClusterHealthWithNoShardMigration(indexName, "green");
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

int expectedCount;
switch (CLUSTER_TYPE) {
case OLD:
expectedCount = 5;
break;
case MIXED:
if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) {
expectedCount = 5;
} else {
expectedCount = 10;
}
break;
case UPGRADED:
expectedCount = 15;
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

waitForSearchableDocs(indexName, shardCount);
assertCount(indexName, expectedCount);

if (CLUSTER_TYPE != ClusterType.OLD) {
logger.info("--> Index one doc (to be deleted next) and verify doc count");
bulk(indexName, "_" + CLUSTER_TYPE, 5);
Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted");
toBeDeleted.addParameter("refresh", "true");
toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}");
client().performRequest(toBeDeleted);
waitForSearchableDocs(indexName, shardCount);
assertCount(indexName, expectedCount + 6);

logger.info("--> Delete previously added doc and verify doc count");
Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted");
delete.addParameter("refresh", "true");
client().performRequest(delete);
waitForSearchableDocs(indexName, shardCount);
assertCount(indexName, expectedCount + 5);
}
logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
}

public void testAutoIdWithOpTypeCreate() throws IOException {
final String indexName = "auto_id_and_op_type_create_index";
StringBuilder b = new StringBuilder();
Expand Down

0 comments on commit ec67769

Please sign in to comment.