Skip to content

Commit

Permalink
Add integ test with network disruption and refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
Anshu Agarwal committed Nov 10, 2022
1 parent c0e5b67 commit f3006a5
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,36 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase {

@Override
protected int numberOfReplicas() {
return 2;
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
}

public void testSearchWithWRRShardRouting() throws IOException {
Expand Down Expand Up @@ -105,7 +115,8 @@ public void testSearchWithWRRShardRouting() throws IOException {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}
// search should not go to nodes in zone c
// search should not go to nodes in zone c with weight zero in case
// shard copies are available in other zones
assertThat(hitNodes.size(), lessThanOrEqualTo(4));
DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
Expand Down Expand Up @@ -162,7 +173,16 @@ public void testSearchWithWRRShardRouting() throws IOException {
}
}

public void testFailOpenOnSearch() throws IOException {
/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs
* This is tested by setting up a 4 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Data nodes in zone a and b are stopped,
* assertions are put to make sure shard search requests do not fail.
* @throws IOException
*/
public void testFailOpenByStoppingDataNodes() throws IOException {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand Down Expand Up @@ -198,14 +218,13 @@ public void testFailOpenOnSearch() throws IOException {
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2))
);
ensureGreen();

logger.info("--> creating indices for test");
for (int i = 0; i < 100; i++) {
client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get();
}
refresh("test");

ClusterState state1 = internalCluster().clusterService().state();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
Expand All @@ -217,6 +236,7 @@ public void testFailOpenOnSearch() throws IOException {
.get();
assertEquals(response.isAcknowledged(), true);

logger.info("--> data nodes in zone a and b are stopped");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_b.get(0)));
ensureStableCluster(2);
Expand All @@ -228,18 +248,21 @@ public void testFailOpenOnSearch() throws IOException {
.prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
.get();
// assert that searches do not fail and are served by data node in zone c
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}

ImmutableOpenMap<String, DiscoveryNode> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
String dataNodeInZoneCID = null;

for (Iterator<DiscoveryNode> it = dataNodes.valuesIt(); it.hasNext();) {
DiscoveryNode node = it.next();
if (node.getAttributes().get("zone").equals("c")) {
nodeIdsFromZoneWithWeightZero.add(node.getId());
dataNodeInZoneCID = node.getId();
break;
}
}

Expand All @@ -249,8 +272,150 @@ public void testFailOpenOnSearch() throws IOException {
if (stat.getNode().isDataNode()) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
assertTrue(stat.getNode().getId().equals(dataNodeInZoneCID));
}
}
}

/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs.
* This is tested by setting up a 4 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Indices are created with one replica copy and network disruption is introduced,
* which makes node in zone a unresponsive.
* Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b.
* Assertions are put to make sure such shard search requests are served by data node in zone c.
* @throws IOException
*/
public void testFailOpenWithUnresponsiveNetworkDisruption() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 10).put("index" + ".number_of_replicas", 1))
);
ensureGreen();
logger.info("--> creating indices for test");
for (int i = 0; i < 50; i++) {
client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get();
}
refresh("test");

ClusterState state1 = internalCluster().clusterService().state();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

logger.info("--> creating network partition disruption");
final String clusterManagerNode1 = internalCluster().getClusterManagerName();
Set<String> nodesInOneSide = Stream.of(clusterManagerNode1, nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(nodes_in_zone_a.get(0)).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.UNRESPONSIVE
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

Set<String> hitNodes = new HashSet<>();
Future<SearchResponse>[] responses = new Future[50];
logger.info("--> making search requests");
for (int i = 0; i < 50; i++) {
responses[i] = internalCluster().client(nodes_in_zone_b.get(0))
.prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
.execute();
}

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

for (int i = 0; i < 50; i++) {
try {
SearchResponse searchResponse = responses[i].get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
} catch (Exception t) {
fail("search should not fail");
}
}

ImmutableOpenMap<String, DiscoveryNode> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes();
String dataNodeInZoneAId = null;
String dataNodeInZoneBId = null;
String dataNodeInZoneCId = null;
for (Iterator<DiscoveryNode> it = dataNodes.valuesIt(); it.hasNext();) {
DiscoveryNode node = it.next();
switch (node.getAttributes().get("zone")) {
case "a":
dataNodeInZoneAId = node.getId();
break;
case "b":
dataNodeInZoneBId = node.getId();
break;
case "c":
dataNodeInZoneCId = node.getId();
}
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode()) {
if (stat.getNode().getId().equals(dataNodeInZoneBId)) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
} else if (stat.getNode().getId().equals(dataNodeInZoneCId)) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
} else {
// search requests do not hit data node in zone a
assertEquals(0, searchStats.getQueryCount());
assertEquals(0, searchStats.getFetchCount());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,21 @@ private ShardRouting nextRoutingOrNull(Exception failure) {
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();

if (next != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(next.currentNodeId(), clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(failure)) {
// This checks if the shard is present in data node with weighted routing weight set to 0,
// In such cases we fail open, if shard search request for the shard from other shard copies fail with non
// retryable exception.
while (next != null && WeightedRoutingHelper.shardInWeighedAwayAZ(next.currentNodeId(), clusterService.state())) {
if (WeightedRoutingHelper.isInternalFailure(failure)) {
ShardRouting shardToFailOpen = next;
logger.info(
() -> new ParameterizedMessage(
"{}: Fail open executed",
shardToFailOpen.shardId()

)
);
break;
}
next = shardsIt.get(shardIndex).nextOrNull();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,21 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
SearchShardTarget nextShard = shardIt.nextOrNull();
while (nextShard != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.getNodeId(), clusterState)
&& !WeightedRoutingHelper.isInternalFailure(e)) {
// This checks if the shard is present in data node with weighted routing weight set to 0,
// In such cases we fail open, if shard search request for the shard from other shard copies fail with non
// retryable exception.
while (nextShard != null && WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.getNodeId(), clusterState)) {
if (WeightedRoutingHelper.isInternalFailure(e)) {
SearchShardTarget shardToFailOpen = nextShard;
logger.info(
() -> new ParameterizedMessage(
"{}: Fail open executed",
shardToFailOpen.getShardId()

)
);
break;
}
nextShard = shardIt.nextOrNull();
}
final boolean lastShard = nextShard == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,21 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();

if (nextShard != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.currentNodeId(), clusterState)
&& !WeightedRoutingHelper.isInternalFailure(e)) {
// This checks if the shard is present in data node with weighted routing weight set to 0,
// In such cases we fail open, if shard search request for the shard from other shard copies fail with non
// retryable exception.
while (nextShard != null && WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.currentNodeId(), clusterState)) {
if (WeightedRoutingHelper.isInternalFailure(e)) {
ShardRouting shardToFailOpen = nextShard;
logger.info(
() -> new ParameterizedMessage(
"{}: Fail open executed",
shardToFailOpen.shardId()

)
);
break;
}
nextShard = shardIt.nextOrNull();
}
if (nextShard != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,23 @@ private void perform(@Nullable final Exception currentFailure) {
this.lastFailure = currentFailure;
}
ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting.currentNodeId(), clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(currentFailure)) {

// This checks if the shard is present in data node with weighted routing weight set to 0,
// In such cases we fail open, if shard search request for the shard from other shard copies fail with non
// retryable exception.
while (shardRouting != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting.currentNodeId(), clusterService.state())) {
if (WeightedRoutingHelper.isInternalFailure(currentFailure)) {
ShardRouting shardToFailOpen = shardRouting;
logger.info(
() -> new ParameterizedMessage(
"{}: Fail open executed",
shardToFailOpen.shardId()

)
);
break;
}
shardRouting = shardIt.nextOrNull();
}
if (shardRouting == null) {
Expand Down
Loading

0 comments on commit f3006a5

Please sign in to comment.