Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add flat_skew setting to node overload decider #3582

Merged
merged 1 commit into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -351,4 +354,140 @@ public void testAwarenessZonesIncrementalNodes() {
assertThat(counts.get(B_1), equalTo(2));
assertThat(counts.get(noZoneNode), equalTo(2));
}

public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception {
int nodeCountPerAZ = 5;
int numOfShards = 30;
int numOfReplica = 1;
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.allocation.load_awareness.skew_factor", "0.0")
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", Integer.toString(nodeCountPerAZ * 3))
.build();

logger.info("--> starting 15 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

// Creating index with 30 primary and 1 replica
createIndex(
"test-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
.build()
);

ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());

ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();

for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
}
}

assertThat(counts.size(), equalTo(nodeCountPerAZ * 3));
// All shards should be started
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(numOfShards * (numOfReplica + 1)));

// stopping half nodes in zone a
int nodesToStop = nodeCountPerAZ / 2;
List<Settings> nodeDataPathSettings = new ArrayList<>();
for (int i = 0; i < nodesToStop; i++) {
nodeDataPathSettings.add(internalCluster().dataPathSettings(nodes_in_zone_a.get(i)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(i)));
}

client().admin().cluster().prepareReroute().setRetryFailed(true).get();
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());

// Creating another index with 30 primary and 1 replica
createIndex(
"test-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
.build()
);

health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1", "test-2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());

// Restarting the nodes back
for (int i = 0; i < nodesToStop; i++) {
internalCluster().startNode(
Settings.builder()
.put("node.name", nodes_in_zone_a.get(i))
.put(nodeDataPathSettings.get(i))
.put(commonSettings)
.put("node.attr.zone", "a")
.build()
);
}
client().admin().cluster().prepareReroute().setRetryFailed(true).get();

health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1", "test-2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
.setWaitForGreenStatus()
.setWaitForActiveShards(2 * numOfShards * (numOfReplica + 1))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

// All shards should be started now and cluster health should be green
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
assertThat(health.isTimedOut(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
* </pre>
* <p>
* and prevent allocation on the surviving nodes of the under capacity cluster
* based on overload factor defined as a percentage by
* based on overload factor defined as a percentage and flat skew as absolute allowed skewness by
* </p>
* <pre>
* cluster.routing.allocation.load_awareness.skew_factor: X
* cluster.routing.allocation.load_awareness.flat_skew: N
* </pre>
* The total limit per node based on skew_factor doesn't limit primaries that previously
* The total limit per node based on skew_factor and flat_skew doesn't limit primaries that previously
* existed on the disk as those shards are force allocated by
* {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
* however new primaries due to index creation, snapshot restore etc can be controlled via the below settings.
Expand Down Expand Up @@ -74,19 +76,29 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
Setting.Property.Dynamic,
Property.NodeScope
);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING = Setting.intSetting(
"cluster.routing.allocation.load_awareness.flat_skew",
2,
2,
Property.Dynamic,
Property.NodeScope
);

private volatile int provisionedCapacity;

private volatile double skewFactor;

private volatile boolean allowUnassignedPrimaries;

private volatile int flatSkew;

private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class);

public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings);
this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings);
this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings);
this.flatSkew = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
Expand All @@ -96,6 +108,7 @@ public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings cluster
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
this::setAllowUnassignedPrimaries
);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, this::setFlatSkew);
}

private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) {
Expand All @@ -110,6 +123,10 @@ private void setProvisionedCapacity(int provisionedCapacity) {
this.provisionedCapacity = provisionedCapacity;
}

private void setFlatSkew(int flatSkew) {
this.flatSkew = flatSkew;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit);
Expand Down Expand Up @@ -146,7 +163,7 @@ private Decision underCapacity(
Metadata metadata = allocation.metadata();
float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity;
int nodeShardCount = node.numberOfOwningShards();
int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
int limit = flatSkew + (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
if (decider.test(nodeShardCount, limit)) {
logger.debug(
() -> new ParameterizedMessage(
Expand All @@ -163,10 +180,11 @@ private Decision underCapacity(
Decision.NO,
NAME,
"too many shards [%d] allocated to this node, limit per node [%d] considering"
+ " overload factor [%.2f] based on capacity [%d]",
+ " overload factor [%.2f] and flat skew [%d] based on capacity [%d]",
nodeShardCount,
limit,
skewFactor,
flatSkew,
provisionedCapacity
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ public void apply(Settings value, Settings current, Settings previous) {
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,
Expand Down
Loading