From 365cae7d0615a460eff929b4a3a4aa641f7f1533 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 14 Jun 2022 10:13:22 +0530 Subject: [PATCH] Add flat_skew setting to node overload decider (#3563) * Add flat_skew setting to node overload decider Signed-off-by: Rishab Nahata (cherry picked from commit 836a9c4910f5eb4e7fcaca5573bd3bf47073d416) --- .../allocation/AwarenessAllocationIT.java | 139 +++++++++++ .../NodeLoadAwareAllocationDecider.java | 26 +- .../common/settings/ClusterSettings.java | 1 + .../NodeLoadAwareAllocationTests.java | 222 ++++++++++++++---- 4 files changed, 338 insertions(+), 50 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java index 224db09d99a99..2b73c5da27606 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java @@ -45,14 +45,17 @@ import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; @@ -351,4 +354,140 @@ public void testAwarenessZonesIncrementalNodes() { assertThat(counts.get(B_1), equalTo(2)); assertThat(counts.get(noZoneNode), equalTo(2)); } + + public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception { + int nodeCountPerAZ = 5; + int numOfShards = 30; + int numOfReplica = 1; + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.allocation.load_awareness.skew_factor", "0.0") + .put("cluster.routing.allocation.load_awareness.provisioned_capacity", Integer.toString(nodeCountPerAZ * 3)) + .build(); + + logger.info("--> starting 15 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + // Creating index with 30 primary and 1 replica + createIndex( + "test-1", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica) + .build() + ); + + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1") + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + ObjectIntHashMap counts = new ObjectIntHashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1); + } + } + } + + assertThat(counts.size(), equalTo(nodeCountPerAZ * 3)); + // All shards should be started + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(numOfShards * (numOfReplica + 1))); + + // stopping half nodes in zone a + int nodesToStop = nodeCountPerAZ / 2; + List nodeDataPathSettings = new ArrayList<>(); + for (int i = 0; i < nodesToStop; i++) { + nodeDataPathSettings.add(internalCluster().dataPathSettings(nodes_in_zone_a.get(i))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(i))); + } + + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1") + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + // Creating another index with 30 primary and 1 replica + createIndex( + "test-2", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica) + .build() + ); + + health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1", "test-2") + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + // Restarting the nodes back + for (int i = 0; i < nodesToStop; i++) { + internalCluster().startNode( + Settings.builder() + .put("node.name", nodes_in_zone_a.get(i)) + .put(nodeDataPathSettings.get(i)) + .put(commonSettings) + .put("node.attr.zone", "a") + .build() + ); + } + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + + health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1", "test-2") + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3)) + .setWaitForGreenStatus() + .setWaitForActiveShards(2 * numOfShards * (numOfReplica + 1)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + // All shards should be started now and cluster health should be green + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1))); + assertThat(health.isTimedOut(), equalTo(false)); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java index 8e2824163709d..c43fb3be214a9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java @@ -33,11 +33,13 @@ * *

* and prevent allocation on the surviving nodes of the under capacity cluster - * based on overload factor defined as a percentage by + * based on overload factor defined as a percentage and flat skew as absolute allowed skewness by + *

*
  * cluster.routing.allocation.load_awareness.skew_factor: X
+ * cluster.routing.allocation.load_awareness.flat_skew: N
  * 
- * The total limit per node based on skew_factor doesn't limit primaries that previously + * The total limit per node based on skew_factor and flat_skew doesn't limit primaries that previously * existed on the disk as those shards are force allocated by * {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} * however new primaries due to index creation, snapshot restore etc can be controlled via the below settings. @@ -74,6 +76,13 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider { Setting.Property.Dynamic, Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING = Setting.intSetting( + "cluster.routing.allocation.load_awareness.flat_skew", + 2, + 2, + Property.Dynamic, + Property.NodeScope + ); private volatile int provisionedCapacity; @@ -81,12 +90,15 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider { private volatile boolean allowUnassignedPrimaries; + private volatile int flatSkew; + private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class); public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) { this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings); this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings); this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings); + this.flatSkew = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor); clusterSettings.addSettingsUpdateConsumer( CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, @@ -96,6 +108,7 @@ public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings cluster CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, this::setAllowUnassignedPrimaries ); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, this::setFlatSkew); } private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) { @@ -110,6 +123,10 @@ private void setProvisionedCapacity(int provisionedCapacity) { this.provisionedCapacity = provisionedCapacity; } + private void setFlatSkew(int flatSkew) { + this.flatSkew = flatSkew; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit); @@ -146,7 +163,7 @@ private Decision underCapacity( Metadata metadata = allocation.metadata(); float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity; int nodeShardCount = node.numberOfOwningShards(); - int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0)); + int limit = flatSkew + (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0)); if (decider.test(nodeShardCount, limit)) { logger.debug( () -> new ParameterizedMessage( @@ -163,10 +180,11 @@ private Decision underCapacity( Decision.NO, NAME, "too many shards [%d] allocated to this node, limit per node [%d] considering" - + " overload factor [%.2f] based on capacity [%d]", + + " overload factor [%.2f] and flat skew [%d] based on capacity [%d]", nodeShardCount, limit, skewFactor, + flatSkew, provisionedCapacity ); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 9da423cd01fe3..617c5a9868409 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -558,6 +558,7 @@ public void apply(Settings value, Settings current, Settings previous) { NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED, ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED, ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java index d2e7e0e7e636a..c4dcae84581cb 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java @@ -22,7 +22,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; -import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; @@ -106,9 +105,11 @@ public void testNewUnassignedPrimaryAllocationOnOverload() { .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) .build(); - // 4 existing shards from this node's local store get started + // 4 existing shards from this node's local store get started and cluster rebalances newState = strategy.reroute(newState, "reroute"); - newState = startInitializingShardsAndReroute(strategy, newState); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); // add back node2 when skewness is still breached @@ -282,11 +283,14 @@ public void testExistingPrimariesAllocationOnOverload() { newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); newState = strategy.reroute(newState, "reroute"); - newState = startInitializingShardsAndReroute(strategy, newState); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + // 28 shards should be assigned (14 on each node -> 8 * 1.5 + 2) logger.info("limits should be applied on newly create primaries"); - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(16)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12)); assertEquals( 12L, @@ -298,7 +302,7 @@ public void testExistingPrimariesAllocationOnOverload() { ); assertEquals( - 4L, + 0L, newState.getRoutingNodes() .shardsWithState(UNASSIGNED) .stream() @@ -306,7 +310,7 @@ public void testExistingPrimariesAllocationOnOverload() { .count() ); - assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(14)); logger.info("--> Remove node4 from zone holding primaries"); newState = removeNodes(newState, strategy, "node4"); @@ -339,10 +343,10 @@ public void testExistingPrimariesAllocationOnOverload() { logger.info("--> do another reroute, make sure nothing moves"); assertThat(strategy.reroute(newState, "reroute").routingTable(), sameInstance(newState.routingTable())); - assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); - assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(12)); + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(14)); + assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(14)); - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); newState = ClusterState.builder(newState) .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) @@ -436,7 +440,8 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30)); + // Each node can take 12 shards each (2 + ceil(8*1.2)) + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); @@ -458,10 +463,12 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); newState = strategy.reroute(newState, "reroute"); - newState = startInitializingShardsAndReroute(strategy, newState); + while (!newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty()) { + newState = startInitializingShardsAndReroute(strategy, newState); + } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(66)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14)); logger.info("add another index with 60 shards"); metadata = Metadata.builder(newState.metadata()) @@ -482,8 +489,8 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(126)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14)); logger.info("change settings to allow unassigned primaries"); strategy = createAllocationServiceWithAdditionalSettings( @@ -499,7 +506,7 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { ); for (RoutingNode node : newState.getRoutingNodes()) { - assertThat(node.size(), equalTo(40)); + assertThat(node.size(), equalTo(42)); } logger.info("add another index with 5 shards"); @@ -513,15 +520,15 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { ) .build(); updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test3")).build(); - // increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35 per node and NO primaries get assigned - // since total owning shards are 40 per node already + // increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35+2=37 per node and NO primaries get assigned + // since total owning shards are 42 per node already newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); newState = strategy.reroute(newState, "reroute"); newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(25)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(126)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(19)); assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().filter(ShardRouting::primary).count(), equalTo(5L)); } @@ -600,21 +607,24 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); - assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(7)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(7)); // add the removed node newState = addNodes(newState, strategy, "zone3", "node11"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); - newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(6)); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); // add the removed node newState = addNodes(newState, strategy, "zone3", "node12"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); - newState = startInitializingShardsAndReroute(strategy, newState); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); // add the removed node @@ -674,13 +684,14 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { logger.info("--> add five new node in new zone and reroute"); clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + // Each node can take 7 shards each now (2 + ceil(4*1.2)) assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(30)); - assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(25)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(30)); logger.info("--> complete relocation"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); - assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(55)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(60)); logger.info("--> do another reroute, make sure nothing moves"); assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); @@ -707,6 +718,7 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } + // Each node can now have 5 shards each assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); @@ -791,8 +803,9 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() newState = startInitializingShardsAndReroute(strategy, newState); } // ensure minority zone doesn't get overloaded - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(53)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + // each node can take 10 shards each (2 + ceil(7*1.1)) + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(61)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); } @@ -912,15 +925,20 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { clusterState = startInitializingShardsAndReroute(strategy, clusterState); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); - // assert replicas are not assigned but primaries are - logger.info("--> replicas are not initializing"); - assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + // Each node can take 11 shards each (2 + ceil(8*1.1)), hence 2 replicas will also start + logger.info("--> 2 replicas are initializing"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); assertFalse(shard.primary()); } + logger.info("--> start the shards (replicas)"); + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + logger.info("--> do another reroute, make sure nothing moves"); assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); @@ -929,10 +947,12 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(18)); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } logger.info("--> replicas are started"); - assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(38)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); @@ -1012,11 +1032,12 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(50)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + // Each node can take 7 shards max ( 2 + ceil(4*1.2)) + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); for (RoutingNode node : newState.getRoutingNodes()) { - assertThat(node.size(), equalTo(5)); + assertThat(node.size(), equalTo(6)); } // add the removed node @@ -1025,9 +1046,7 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() .build(); newState = strategy.reroute(newState, "reroute"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); // add the removed node newState = ClusterState.builder(newState) @@ -1035,9 +1054,7 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() .build(); newState = strategy.reroute(newState, "reroute"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); // add the removed node newState = ClusterState.builder(newState) @@ -1068,6 +1085,120 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); } + public void testThreeZoneOneReplicaWithSkewFactorZeroAllShardsAssignedAfterRecovery() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 0, + "cluster.routing.allocation.awareness.force.zone.values", + "zone1,zone2,zone3" + ) + ); + + logger.info("Building initial routing table for 'testThreeZoneOneReplicaWithSkewFactorZeroAllShardsAssignedAfterRecovery'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(30).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(30)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Removing three nodes from zone3"); + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + // Each node can take 6 shards max (2 + ceil(4*1.0)), so all shards should be assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + logger.info("add another index with 30 primary 1 replica"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test1") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 30) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node11", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node12", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node13", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(8)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(8)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(8)); + // ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + private ClusterState removeNodes(ClusterState clusterState, AllocationService allocationService, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.remove(nodeId)); @@ -1097,7 +1228,6 @@ private Settings buildSettings(Map settingsValue) { .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "zone"); settingsValue.forEach((k, v) -> { if (v instanceof Integer) settingsBuilder.put(k, (Integer) (v));