From 05897cc90ed22add4afd3325f3090e89f1af324d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 28 Mar 2023 01:20:07 +0000 Subject: [PATCH] Add condition to check if backpressure is enabled. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationPressureService.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index b1e4c0387558a..467134a2fc0cd 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -202,37 +202,39 @@ protected boolean mustReschedule() { @Override protected void runInternal() { - final SegmentReplicationStats stats = tracker.getStats(); - for (Map.Entry entry : stats.getShardStats().entrySet()) { - final Set staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats()); - final ShardId shardId = entry.getKey(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); - final IndexShard primaryShard = indexService.getShard(shardId.getId()); - for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) { - shardStateAction.remoteShardFailed( - shardId, - staleReplica.getAllocationId(), - primaryShard.getOperationPrimaryTerm(), - true, - "replica too far behind primary, marking as stale", - null, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.trace( - "Successfully failed remote shardId [{}] allocation id [{}]", - shardId, - staleReplica.getAllocationId() - ); + if (isSegmentReplicationBackpressureEnabled) { + final SegmentReplicationStats stats = tracker.getStats(); + for (Map.Entry entry : stats.getShardStats().entrySet()) { + final Set staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats()); + final ShardId shardId = entry.getKey(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard primaryShard = indexService.getShard(shardId.getId()); + for (SegmentReplicationShardStats staleReplica : staleReplicas) { + if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) { + shardStateAction.remoteShardFailed( + shardId, + staleReplica.getAllocationId(), + primaryShard.getOperationPrimaryTerm(), + true, + "replica too far behind primary, marking as stale", + null, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.trace( + "Successfully failed remote shardId [{}] allocation id [{}]", + shardId, + staleReplica.getAllocationId() + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to send remote shard failure", e); + } } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to send remote shard failure", e); - } - } - ); + ); + } } } }