Skip to content

Commit

Permalink
Fix for bug showing incorrect awareness attributes count in Awareness…
Browse files Browse the repository at this point in the history
…AllocationDecider (#3428) (#3580)

* Fix for bug showing incorrect awareness attributes count in AwarenessAllocationDecider

Signed-off-by: Anshu Agarwal <[email protected]>
(cherry picked from commit 2f9f8e1)

Co-authored-by: Anshu Agarwal <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and anshu1106 authored Jun 15, 2022
1 parent f69c34c commit 0a5bf16
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
package org.opensearch.cluster.routing.allocation.decider;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -207,12 +210,14 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout

int numberOfAttributes = nodesPerAttribute.size();
List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);

if (fullValues != null) {
for (String fullValue : fullValues) {
if (shardPerAttribute.containsKey(fullValue) == false) {
numberOfAttributes++;
}
// If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes)))
Set<String> attributesSet = new HashSet<>(fullValues);
for (ObjectCursor<String> stringObjectCursor : nodesPerAttribute.keys()) {
attributesSet.add(stringObjectCursor.value);
}
numberOfAttributes = attributesSet.size();
}
// TODO should we remove ones that are not part of full list?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,32 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -971,4 +980,87 @@ public void testMultipleAwarenessAttributes() {
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
}

public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() {
Settings settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

AllocationService strategy = createAllocationService(settings);

logger.info("Building initial routing table for 'testAllocationExplainForUnassignedShardsWithUnbalancedZones'");
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(initialRoutingTable)
.build();

logger.info("--> adding 3 nodes in different zones and do rerouting");
clusterState = ClusterState.builder(clusterState)
.nodes(
DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
)
.build();
clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));

logger.info("--> start the shard (primary)");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
// One Shard is unassigned due to forced zone awareness
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));

List<ShardRouting> unassignedShards = clusterState.getRoutingTable().shardsWithState(UNASSIGNED);

ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
// Add a new node in zone c
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("C-0", singletonMap("zone", "c"))))
.build();

final AwarenessAllocationDecider decider = new AwarenessAllocationDecider(settings, EMPTY_CLUSTER_SETTINGS);

final RoutingAllocation allocation = new RoutingAllocation(
new AllocationDeciders(Collections.singleton(decider)),
clusterState.getRoutingNodes(),
clusterState,
null,
null,
0L
);
allocation.debugDecision(true);

Decision decision = null;
RoutingNodes nodes = clusterState.getRoutingNodes();

for (RoutingNode node : nodes) {
// Try to allocate unassigned shard to A-0, fails because of forced zone awareness
if (node.nodeId().equals("A-0")) {
decision = decider.canAllocate(unassignedShards.get(0), node, allocation);
assertEquals(Decision.Type.NO, decision.type());
assertEquals(
decision.getExplanation(),
"there are too many copies of the shard allocated to nodes with attribute"
+ " [zone], there are [3] total configured shard copies for this shard id and [3]"
+ " total attribute values, expected the allocated shard count per attribute [2] to"
+ " be less than or equal to the upper bound of the required number of shards per attribute [1]"
);
}

}
}
}

0 comments on commit 0a5bf16

Please sign in to comment.