Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
1. Made changes so that Allocation Service run only default implementation of batch mode
2. Renamed methods
3. Added and modified documenatation

Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Jan 12, 2024
1 parent b1994fa commit 257f36d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
Expand Down Expand Up @@ -201,9 +200,9 @@ private ClusterState buildResult(ClusterState oldState, RoutingAllocation alloca
if (restoreInProgress != null) {
RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress);
if (updatedRestoreInProgress != restoreInProgress) {
ImmutableOpenMap.Builder<String, ClusterState.Custom> customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms());
final Map<String, ClusterState.Custom> customsBuilder = new HashMap<>(allocation.getCustoms());
customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
newStateBuilder.customs(customsBuilder.build());
newStateBuilder.customs(customsBuilder);
}
}
return newStateBuilder.build();
Expand Down Expand Up @@ -566,30 +565,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
existingShardsAllocator.beforeAllocation(allocation);
}

/*
Use batch mode if enabled and there is no custom allocator set for Allocation service
*/
Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings);

if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) {
// since allocators is per index setting, to have batch assignment verify allocators same for all shards
// if not fallback to single assignment
ExistingShardsAllocator allocator = getAndVerifySameAllocatorForAllUnassignedShards(allocation);
if (allocator != null) {
// use batch mode implementation of GatewayAllocator
if (allocator.getClass() == GatewayAllocator.class) {
allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
}

allocator.allocateUnassignedBatch(allocation, true);
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
}
allocator.allocateUnassignedBatch(allocation, false);
return;
} else {
// it means though batch mode is enabled but some indices have custom allocator set and we cant do Batch recover in that
// case fallback to single assignment and
logger.debug("Batch mode is enabled but some indices have custom allocator set. Falling back to single assignment");
}
}
if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT) && existingShardsAllocators.size() == 2) {
/*
If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator
Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards
*/
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
allocator.afterPrimariesBeforeReplicas(allocation);
allocator.allocateAllUnassignedShards(allocation, false);
return;
}
logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set");

final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
Expand All @@ -612,32 +603,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}

/**
* Verify if all unassigned shards are allocated by the same allocator, if yes then return the allocator, else
* return null
* @param allocation {@link RoutingAllocation}
* @return {@link ExistingShardsAllocator} or null
*/
private ExistingShardsAllocator getAndVerifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) {
// if there is a single Allocator set in Allocation Service then use it for all shards
if (existingShardsAllocators.size() == 1) {
return existingShardsAllocators.values().iterator().next();
}
RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned();
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator();
ExistingShardsAllocator currentAllocatorForShard = null;
if (unassignedShards.size() > 0) {
currentAllocatorForShard = getAllocatorForShard(iterator.next(), allocation);
while (iterator.hasNext()) {
ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation);
if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) {
return null;
}
}
}
return currentAllocatorForShard;
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
RoutingNode node = it.next();
Expand Down Expand Up @@ -677,9 +642,9 @@ private void applyStartedShards(RoutingAllocation routingAllocation, List<ShardR
: "shard started for unknown index (shard entry: " + startedShard + ")";
assert startedShard == routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId())
: "shard routing to start does not exist in routing table, expected: "
+ startedShard
+ " but was: "
+ routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
+ startedShard
+ " but was: "
+ routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());

routingNodes.startShard(logger, startedShard, routingAllocation.changes());
}
Expand Down Expand Up @@ -739,7 +704,7 @@ private ExistingShardsAllocator getAllocatorForShard(ShardRouting shardRouting,
final String allocatorName = ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.get(
routingAllocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
);
final ExistingShardsAllocator existingShardsAllocator = existingShardsAllocators.get(allocatorName);
ExistingShardsAllocator existingShardsAllocator = existingShardsAllocators.get(allocatorName);
return existingShardsAllocator != null ? existingShardsAllocator : new NotFoundAllocator(allocatorName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.List;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
* {@link GatewayAllocator}, but plugins can supply their own implementations too.
* {@link GatewayAllocator} and {@link ShardsBatchGatewayAllocator}, but plugins can supply their own implementations too.
*
* @opensearch.internal
*/
Expand All @@ -66,20 +67,21 @@ public interface ExistingShardsAllocator {
* in one or more go.
*
* Enable this setting if your ExistingShardAllocator is implementing the
* {@link ExistingShardsAllocator#allocateUnassignedBatch(RoutingAllocation, boolean)} method.
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
*
* If enable to true then it expects all indices of the shard to use same {@link ExistingShardsAllocator}, otherwise
* Allocation Service will fallback to default implementation i.e. {@link ExistingShardsAllocator#allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)}
*
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* {@link GatewayAllocator}
* {@link ShardsBatchGatewayAllocator}. Right now even if plugin implements it, AllocationService will run the
* default implementation to enable Batch mode of assignment
*
* TODO: Currently its implementation is WIP for GatewayAllocator so setting enabling wont have any effect
* https://github.com/opensearch-project/OpenSearch/issues/5098
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enable",
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
Setting.Property.NodeScope
);
Expand Down Expand Up @@ -108,8 +110,10 @@ void allocateUnassigned(
* Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible.
* Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard
* and is kept here for backward compatibility.
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) {
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
Expand Down

0 comments on commit 257f36d

Please sign in to comment.