diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index 8a14ce3f1a288..467e1df759184 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer { private final Logger logger; private final RoutingAllocation allocation; private final RoutingNodes routingNodes; + // indicates if there are any nodes being throttled for allocating any unassigned shards + private boolean anyNodesThrottled = false; public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { this.logger = logger; @@ -323,12 +325,16 @@ private void allocateUnassignedReplicas(Queue nodeQueue, Map unassignedShardMap) { + // If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED + final UnassignedInfo.AllocationStatus status = anyNodesThrottled + ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED + : UnassignedInfo.AllocationStatus.DECIDERS_NO; for (UnassignedIndexShards indexShards : unassignedShardMap.values()) { for (ShardRouting shard : indexShards.getPrimaries()) { - routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); } for (ShardRouting shard : indexShards.getReplicas()) { - routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); } } } @@ -389,11 +395,11 @@ private void allocateUnassignedShards( private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouting shard) { boolean allocated = false; boolean throttled = false; - Set nodesCheckedForShard = new HashSet<>(); + int numNodesToCheck = nodeQueue.size(); while (!nodeQueue.isEmpty()) { RoutingNode node = nodeQueue.poll(); + --numNodesToCheck; Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation); - nodesCheckedForShard.add(node.nodeId()); if (allocateDecision.type() == Decision.Type.YES) { if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId()); @@ -432,6 +438,10 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti } nodeQueue.offer(node); } else { + if (nodeLevelDecision.type() == Decision.Type.THROTTLE) { + anyNodesThrottled = true; + } + if (logger.isTraceEnabled()) { logger.trace( "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", @@ -443,14 +453,14 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti } // Break out if all nodes in the queue have been checked for this shard - if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + if (numNodesToCheck == 0) { break; } } } if (!allocated) { - UnassignedInfo.AllocationStatus status = throttled + UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled) ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED : UnassignedInfo.AllocationStatus.DECIDERS_NO; routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index a1db6cd83ab6c..6a03a1f79bcde 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.ALWAYS; } } + + @Override + public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { + return throttle ? Decision.THROTTLE : Decision.YES; + } }); Collections.shuffle(deciders, random()); return new AllocationDeciders(deciders);