Skip to content

Commit

Permalink
Avoid unnecesaary iteration
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jul 23, 2024
1 parent f5501fa commit d214eec
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public void run() {
"Time taken to execute timed runnables in this cycle:[{}ms]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
);
onComplete();
}

public void onComplete(){}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.core.index.shard.ShardId;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -81,13 +82,12 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

protected void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation, boolean primary) {
Set<ShardRouting> batchShardRoutingSet = new HashSet<>(shardRoutings);
protected void allocateUnassignedBatchOnTimeout(Set<ShardId> shardRoutingIds, RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && batchShardRoutingSet.contains(unassignedShard)) {
if (unassignedShard.primary() == primary && shardRoutingIds.contains(unassignedShard.shardId())) {
allocationDecision = AllocateUnassignedDecision.throttle(null);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private final TransportNodesListGatewayStartedShardsBatch batchStartedAction;
private final TransportNodesListShardStoreMetadataBatch batchStoreAction;

private Set<ShardId> timedOutPrimaryShardIds;
private Set<ShardId> timedOutReplicaShardIds;

@Inject
public ShardsBatchGatewayAllocator(
RerouteService rerouteService,
Expand Down Expand Up @@ -245,24 +248,26 @@ protected BatchRunnableExecutor innerAllocateUnassignedBatch(
}
List<TimeoutAwareRunnable> runnables = new ArrayList<>();
if (primary) {
timedOutPrimaryShardIds = new HashSet<>();
batchIdToStartedShardBatch.values()
.stream()
.filter(batch -> batchesToAssign.contains(batch.batchId))
.forEach(shardsBatch -> runnables.add(new TimeoutAwareRunnable() {
@Override
public void onTimeout() {
long startTime = System.nanoTime();
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(
shardsBatch.getBatchedShardRoutings(),
allocation,
true
);
logger.info(
"Time taken to execute allocateUnassignedBatchOnTimeout for unassigned primary batch with id [{}], size : [{}] in this cycle:[{}ms]",
shardsBatch.batchId,
shardsBatch.getBatchedShardRoutings().size(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
);
// long startTime = System.nanoTime();
// primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(
// shardsBatch.getBatchedShardRoutings(),
// allocation,
// true
// );
// logger.info(
// "Time taken to execute allocateUnassignedBatchOnTimeout for unassigned primary batch with id [{}], size : [{}] in this cycle:[{}ms]",
// shardsBatch.batchId,
// shardsBatch.getBatchedShardRoutings().size(),
// TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
// );
timedOutPrimaryShardIds.addAll(shardsBatch.getBatchedShards());
}

@Override
Expand All @@ -278,22 +283,29 @@ public void run() {

}
}));
return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout);
return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) {
@Override
public void onComplete() {
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true);
}
};
} else {
timedOutReplicaShardIds = new HashSet<>();
batchIdToStoreShardBatch.values()
.stream()
.filter(batch -> batchesToAssign.contains(batch.batchId))
.forEach(batch -> runnables.add(new TimeoutAwareRunnable() {
@Override
public void onTimeout() {
long startTime = System.nanoTime();
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation, false);
logger.info(
"Time taken to execute allocateUnassignedBatchOnTimeout for unassigned replica batch with id [{}], size : [{}] in this cycle:[{}ms]",
batch.batchId,
batch.getBatchedShardRoutings().size(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
);
// long startTime = System.nanoTime();
// replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation, false);
// logger.info(
// "Time taken to execute allocateUnassignedBatchOnTimeout for unassigned replica batch with id [{}], size : [{}] in this cycle:[{}ms]",
// batch.batchId,
// batch.getBatchedShardRoutings().size(),
// TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
// );
timedOutReplicaShardIds.addAll(batch.getBatchedShards());

}

Expand All @@ -310,7 +322,12 @@ public void run() {

}
}));
return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout);
return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout) {
@Override
public void onComplete() {
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
}
};
}
}

Expand Down

0 comments on commit d214eec

Please sign in to comment.