Skip to content

Commit

Permalink
BaseGatewayShardAllocator changes for Assigning the batch of shards (#…
Browse files Browse the repository at this point in the history
…8776) (#12773)

* BaseGatewayShardAllocator changes for Assigning the batch of shards



(cherry picked from commit ef50fb4)

Signed-off-by: Gaurav Chandani <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Aman Khare <[email protected]>
  • Loading branch information
3 people authored Mar 21, 2024
1 parent b0cdc48 commit 41d11e1
Showing 1 changed file with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -45,7 +46,9 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand All @@ -64,8 +67,9 @@ public abstract class BaseGatewayShardAllocator {
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
* to make decisions on assigning shards to nodes.
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
*
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
* @param unassignedAllocationHandler handles the allocation of the current shard
*/
public void allocateUnassigned(
Expand All @@ -74,7 +78,46 @@ public void allocateUnassigned(
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
throw e;
}
}
}

private void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
return;
Expand Down Expand Up @@ -109,9 +152,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
*
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
*/
public abstract AllocateUnassignedDecision makeAllocationDecision(
Expand All @@ -120,6 +163,21 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down

0 comments on commit 41d11e1

Please sign in to comment.