Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Aug 13, 2024
1 parent fa0ad4e commit 474060e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -91,7 +90,7 @@ protected void allocateUnassignedBatchOnTimeout(Set<ShardId> shardIds, RoutingAl
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && shardIds.contains(unassignedShard.shardId())) {
if (isResponsibleFor(unassignedShard, primary) == false) {
if (isResponsibleFor(unassignedShard) == false) {
continue;
}
allocationDecision = AllocateUnassignedDecision.throttle(null);
Expand All @@ -103,20 +102,7 @@ protected void allocateUnassignedBatchOnTimeout(Set<ShardId> shardIds, RoutingAl
/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected static boolean isResponsibleFor(final ShardRouting shard, boolean primary) {
if (primary) {
return shard.primary() // must be primary
&& shard.unassigned() // must be unassigned
// only handle either an existing store or a snapshot recovery
&& (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
} else {
return shard.primary() == false // must be a replica
&& shard.unassigned() // must be unassigned
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
&& shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED;
}
}
protected abstract boolean isResponsibleFor(ShardRouting shardRouting);

protected void executeDecision(
ShardRouting shardRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@
* @opensearch.internal
*/
public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected boolean isResponsibleFor(final ShardRouting shard) {
return shard.primary() // must be primary
&& shard.unassigned() // must be unassigned
// only handle either an existing store or a snapshot recovery
&& (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
}

/**
* Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is
Expand All @@ -89,7 +99,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
* @return allocation decision taken for this shard
*/
protected AllocateUnassignedDecision getInEligibleShardDecision(ShardRouting unassignedShard, RoutingAllocation allocation) {
if (isResponsibleFor(unassignedShard, true) == false) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for allocating this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,23 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
}
}

/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected boolean isResponsibleFor(final ShardRouting shard) {
return shard.primary() == false // must be a replica
&& shard.unassigned() // must be unassigned
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
&& shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED;
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger
) {
if (isResponsibleFor(unassignedShard, false) == false) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for deciding on this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
RoutingAllocation allocation,
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
) {
if (!isResponsibleFor(shardRouting, false)) {
if (isResponsibleFor(shardRouting) == false) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
Expand Down

0 comments on commit 474060e

Please sign in to comment.