Skip to content

Commit

Permalink
Fixed bugs in filtering the shard to execute decision on
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Sep 6, 2023
1 parent 37796c1 commit 20e11a4
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -82,21 +83,44 @@ public void allocateUnassigned(

public void allocateUnassignedBatch(Set<ShardRouting> shards, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
ConcurrentMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shards, allocation, logger);
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shards, allocation, logger);
assert shards.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for " + "some shards";
// get all unassigned shards
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()){

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
if (shards.contains(shard)) {
executeDecision(shard, decisionMap.get(shard), allocation, iterator);
try {
if (decisionMap.isEmpty() == false) {
if (shards.stream()
.filter(shardRouting -> shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary() == shard.primary())
.count() == 1) {
List<ShardRouting> matchedShardRouting = decisionMap.keySet()
.stream()
.filter(
shardRouting -> shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary() == shard.primary()
)
.collect(Collectors.toList());
if (matchedShardRouting.size() == 1) {
executeDecision(shard, decisionMap.remove(matchedShardRouting.get(0)), allocation, iterator);
} else if (matchedShardRouting.size() > 1) {
// Adding this just to check the behaviour if we ever land up here.
throw new IllegalStateException("decision map must have single entry for 1 shard");
}
}
}
} catch (Exception e) {
logger.error("failed to execute decision for shard {} ", shard, e);
}
}
}

private void executeDecision(ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
private void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
return;
Expand All @@ -113,6 +137,7 @@ private void executeDecision(ShardRouting shardRouting,
unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
}
}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -141,10 +166,10 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public abstract ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
public abstract HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -460,7 +460,11 @@ private static NodesToAllocate buildNodesToAllocate(

@Override
// to be override
public ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -498,11 +497,14 @@ private static boolean canPerformOperationBasedRecovery(

@Override
// to be override
public ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
return null;
}


/**
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
*/
Expand Down

0 comments on commit 20e11a4

Please sign in to comment.