diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 914b57e30e1e5..d0742e399de8e 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -13,6 +13,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -27,8 +28,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.Set; -import java.util.Map; import java.util.stream.Collectors; /** @@ -209,59 +210,54 @@ protected boolean mustReschedule() { protected void runInternal() { if (pressureService.isSegmentReplicationBackpressureEnabled) { final SegmentReplicationStats stats = pressureService.tracker.getStats(); - long highestCurrentReplicationTimeMillis = 0; - ShardId shardIdWithHighestCurrentReplicationTime = null; // Find the shardId in node which is having stale replicas with highest current replication time. // This way we only fail one shardId's stale replicas in every iteration of this background async task and there by decrease // load gradually on node. - for (Map.Entry entry : stats.getShardStats().entrySet()) { - final Set staleReplicas = pressureService.getStaleReplicas( - entry.getValue().getReplicaStats() - ); - for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > highestCurrentReplicationTimeMillis) { - shardIdWithHighestCurrentReplicationTime = entry.getKey(); - } - } - } - - // Fail the stale replicas of shardId having highest current replication time - if (shardIdWithHighestCurrentReplicationTime != null) { - final Set staleReplicas = pressureService.getStaleReplicas( - stats.getShardStats().get(shardIdWithHighestCurrentReplicationTime).getReplicaStats() - ); - final ShardId shardId = shardIdWithHighestCurrentReplicationTime; - final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); - final IndexShard primaryShard = indexService.getShard(shardId.getId()); - for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) { - pressureService.shardStateAction.remoteShardFailed( - shardId, - staleReplica.getAllocationId(), - primaryShard.getOperationPrimaryTerm(), - true, - "replica too far behind primary, marking as stale", - null, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.trace( - "Successfully failed remote shardId [{}] allocation id [{}]", - shardId, - staleReplica.getAllocationId() - ); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to send remote shard failure", e); + stats.getShardStats() + .entrySet() + .stream() + .flatMap( + entry -> pressureService.getStaleReplicas(entry.getValue().getReplicaStats()) + .stream() + .map(r -> Tuple.tuple(entry.getKey(), r.getCurrentReplicationTimeMillis())) + ) + .max(Comparator.comparingLong(Tuple::v2)) + .map(Tuple::v1) + .ifPresent(shardId -> { + final Set staleReplicas = pressureService.getStaleReplicas( + stats.getShardStats().get(shardId).getReplicaStats() + ); + final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); + final IndexShard primaryShard = indexService.getShard(shardId.getId()); + for (SegmentReplicationShardStats staleReplica : staleReplicas) { + if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) { + pressureService.shardStateAction.remoteShardFailed( + shardId, + staleReplica.getAllocationId(), + primaryShard.getOperationPrimaryTerm(), + true, + "replica too far behind primary, marking as stale", + null, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.trace( + "Successfully failed remote shardId [{}] allocation id [{}]", + shardId, + staleReplica.getAllocationId() + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to send remote shard failure", e); + } } - } - ); + ); + } } - } - } + }); } }