Skip to content

Commit

Permalink
Addressing comments on PR.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Mar 30, 2023
1 parent 282ca4f commit 04481da
Showing 1 changed file with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public SegmentReplicationPressureService(
this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas);

this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(TimeValue.timeValueSeconds(30));
this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this, TimeValue.timeValueSeconds(30));
}

public void isSegrepLimitBreached(ShardId shardId) {
Expand Down Expand Up @@ -188,10 +188,13 @@ public void close() throws IOException {
}

// Background Task to fail replica shards if they are too far behind primary shard.
final class AsyncFailStaleReplicaTask extends AbstractAsyncTask {
final static class AsyncFailStaleReplicaTask extends AbstractAsyncTask {

AsyncFailStaleReplicaTask(TimeValue interval) {
super(logger, threadPool, interval, true);
final SegmentReplicationPressureService pressureService;

AsyncFailStaleReplicaTask(SegmentReplicationPressureService pressureService, TimeValue interval) {
super(logger, pressureService.threadPool, interval, true);
this.pressureService = pressureService;
rescheduleIfNecessary();
}

Expand All @@ -202,16 +205,18 @@ protected boolean mustReschedule() {

@Override
protected void runInternal() {
if (isSegmentReplicationBackpressureEnabled) {
final SegmentReplicationStats stats = tracker.getStats();
if (pressureService.isSegmentReplicationBackpressureEnabled) {
final SegmentReplicationStats stats = pressureService.tracker.getStats();
for (Map.Entry<ShardId, SegmentReplicationPerGroupStats> entry : stats.getShardStats().entrySet()) {
final Set<SegmentReplicationShardStats> staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats());
final Set<SegmentReplicationShardStats> staleReplicas = pressureService.getStaleReplicas(
entry.getValue().getReplicaStats()
);
final ShardId shardId = entry.getKey();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex());
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) {
shardStateAction.remoteShardFailed(
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
primaryShard.getOperationPrimaryTerm(),
Expand Down Expand Up @@ -240,6 +245,11 @@ public void onFailure(Exception e) {
}
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "fail_stale_replica";
Expand Down

0 comments on commit 04481da

Please sign in to comment.