Skip to content

Commit

Permalink
Add condition to check if backpressure is enabled.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Mar 28, 2023
1 parent bf9b3dc commit 05897cc
Showing 1 changed file with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,37 +202,39 @@ protected boolean mustReschedule() {

@Override
protected void runInternal() {
final SegmentReplicationStats stats = tracker.getStats();
for (Map.Entry<ShardId, SegmentReplicationPerGroupStats> entry : stats.getShardStats().entrySet()) {
final Set<SegmentReplicationShardStats> staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats());
final ShardId shardId = entry.getKey();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) {
shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
primaryShard.getOperationPrimaryTerm(),
true,
"replica too far behind primary, marking as stale",
null,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace(
"Successfully failed remote shardId [{}] allocation id [{}]",
shardId,
staleReplica.getAllocationId()
);
if (isSegmentReplicationBackpressureEnabled) {
final SegmentReplicationStats stats = tracker.getStats();
for (Map.Entry<ShardId, SegmentReplicationPerGroupStats> entry : stats.getShardStats().entrySet()) {
final Set<SegmentReplicationShardStats> staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats());
final ShardId shardId = entry.getKey();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) {
shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
primaryShard.getOperationPrimaryTerm(),
true,
"replica too far behind primary, marking as stale",
null,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace(
"Successfully failed remote shardId [{}] allocation id [{}]",
shardId,
staleReplica.getAllocationId()
);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to send remote shard failure", e);
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to send remote shard failure", e);
}
}
);
);
}
}
}
}
Expand Down

0 comments on commit 05897cc

Please sign in to comment.