Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Apr 4, 2023
1 parent c332c15 commit 54bae01
Showing 1 changed file with 44 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -27,8 +28,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.Set;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -209,59 +210,54 @@ protected boolean mustReschedule() {
protected void runInternal() {
if (pressureService.isSegmentReplicationBackpressureEnabled) {
final SegmentReplicationStats stats = pressureService.tracker.getStats();
long highestCurrentReplicationTimeMillis = 0;
ShardId shardIdWithHighestCurrentReplicationTime = null;

// Find the shardId in node which is having stale replicas with highest current replication time.
// This way we only fail one shardId's stale replicas in every iteration of this background async task and there by decrease
// load gradually on node.
for (Map.Entry<ShardId, SegmentReplicationPerGroupStats> entry : stats.getShardStats().entrySet()) {
final Set<SegmentReplicationShardStats> staleReplicas = pressureService.getStaleReplicas(
entry.getValue().getReplicaStats()
);
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > highestCurrentReplicationTimeMillis) {
shardIdWithHighestCurrentReplicationTime = entry.getKey();
}
}
}

// Fail the stale replicas of shardId having highest current replication time
if (shardIdWithHighestCurrentReplicationTime != null) {
final Set<SegmentReplicationShardStats> staleReplicas = pressureService.getStaleReplicas(
stats.getShardStats().get(shardIdWithHighestCurrentReplicationTime).getReplicaStats()
);
final ShardId shardId = shardIdWithHighestCurrentReplicationTime;
final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex());
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
primaryShard.getOperationPrimaryTerm(),
true,
"replica too far behind primary, marking as stale",
null,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace(
"Successfully failed remote shardId [{}] allocation id [{}]",
shardId,
staleReplica.getAllocationId()
);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to send remote shard failure", e);
stats.getShardStats()
.entrySet()
.stream()
.flatMap(
entry -> pressureService.getStaleReplicas(entry.getValue().getReplicaStats())
.stream()
.map(r -> Tuple.tuple(entry.getKey(), r.getCurrentReplicationTimeMillis()))
)
.max(Comparator.comparingLong(Tuple::v2))
.map(Tuple::v1)
.ifPresent(shardId -> {
final Set<SegmentReplicationShardStats> staleReplicas = pressureService.getStaleReplicas(
stats.getShardStats().get(shardId).getReplicaStats()
);
final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex());
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
primaryShard.getOperationPrimaryTerm(),
true,
"replica too far behind primary, marking as stale",
null,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace(
"Successfully failed remote shardId [{}] allocation id [{}]",
shardId,
staleReplica.getAllocationId()
);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to send remote shard failure", e);
}
}
}
);
);
}
}
}
}
});
}
}

Expand Down

0 comments on commit 54bae01

Please sign in to comment.