Skip to content

Commit

Permalink
Compute lag early
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 25, 2023
1 parent 71cf169 commit 893b546
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.index.shard.ShardId;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -35,6 +36,10 @@ public class RemoteSegmentUploadShardStatsTracker implements Writeable {

public static final int UPLOAD_TIME_WINDOW_SIZE = 2000;

private final AtomicLong seqNoLag = new AtomicLong();

private final AtomicLong timeMsLag = new AtomicLong();

private final AtomicLong localRefreshSeqNo = new AtomicLong();

private final AtomicLong localRefreshTime = new AtomicLong();
Expand Down Expand Up @@ -67,7 +72,7 @@ public class RemoteSegmentUploadShardStatsTracker implements Writeable {
/**
* Keeps list of filename of the most recent segments uploaded as part of refresh.
*/
private volatile Set<String> latestUploadFiles;
private volatile Set<String> latestUploadFiles = new HashSet<>();

private final Streak failures = new Streak();

Expand Down Expand Up @@ -209,10 +214,12 @@ public long getLocalRefreshTime() {

public void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
this.localRefreshSeqNo.set(localRefreshSeqNo);
computeSeqNoLag();
}

public void updateLocalRefreshTime(long localRefreshTime) {
this.localRefreshTime.set(localRefreshTime);
computeTimeLag();
}

public long getRemoteRefreshSeqNo() {
Expand All @@ -221,6 +228,7 @@ public long getRemoteRefreshSeqNo() {

public void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
this.remoteRefreshSeqNo.set(remoteRefreshSeqNo);
computeSeqNoLag();
}

public long getRemoteRefreshTime() {
Expand All @@ -229,14 +237,23 @@ public long getRemoteRefreshTime() {

public void updateRemoteRefreshTime(long remoteRefreshTime) {
this.remoteRefreshTime.set(remoteRefreshTime);
computeTimeLag();
}

private void computeSeqNoLag() {
seqNoLag.set(localRefreshSeqNo.get() - remoteRefreshSeqNo.get());
}

public long getSeqNoLag() {
return localRefreshSeqNo.get() - remoteRefreshSeqNo.get();
return seqNoLag.get();
}

private void computeTimeLag() {
timeMsLag.set(localRefreshTime.get() - remoteRefreshTime.get());
}

public long getTimeLag() {
return localRefreshTime.get() - remoteRefreshTime.get();
return timeMsLag.get();
}

public Map<String, Long> getLatestLocalFileNameLengthMap() {
Expand All @@ -245,12 +262,16 @@ public Map<String, Long> getLatestLocalFileNameLengthMap() {

public void updateLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
computeBytesLag();
}

public void updateLatestUploadFiles(Set<String> latestUploadFiles) {
this.latestUploadFiles = latestUploadFiles;
this.latestUploadFiles = new HashSet<>(latestUploadFiles);
computeBytesLag();
}

private final AtomicLong bytesLag = new AtomicLong();

public int getConsecutiveFailureCount() {
return failures.length();
}
Expand Down Expand Up @@ -291,15 +312,20 @@ public double getUploadTimeAverage() {
return uploadTimeMovingAverage.getAverage();
}

public long getBytesLag() {
private void computeBytesLag() {
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
return 0;
return;
}
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()
.stream()
.filter(f -> latestUploadFiles == null || latestUploadFiles.contains(f) == false)
.filter(f -> !latestUploadFiles.contains(f))
.collect(Collectors.toSet());
return filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
long bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
this.bytesLag.set(bytesLag);
}

public long getBytesLag() {
return bytesLag.get();
}

public long getInflightUploadBytes() {
Expand All @@ -324,5 +350,6 @@ public void updateUploadTimeMovingAverageWindowSize(int updatedSize) {

public void addToLatestUploadFiles(String file) {
this.latestUploadFiles.add(file);
computeBytesLag();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public RemoteSegmentUploadShardStatsTracker getStatsTracker(ShardId shardId) {

@Override
public void afterIndexShardCreated(IndexShard indexShard) {
logger.info("creating tracker for shard={}", indexShard);
RemoteUploadStatsTracker.INSTANCE.createStatsTracker(
indexShard.shardId(),
remoteUploadPressureSettings.getUploadBytesMovingAverageWindowSize(),
Expand All @@ -52,7 +53,8 @@ public void afterIndexShardCreated(IndexShard indexShard) {
}

@Override
public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
logger.info("deleting tracker for shard={}", indexShard);
RemoteUploadStatsTracker.INSTANCE.remove(shardId);
}

Expand Down

0 comments on commit 893b546

Please sign in to comment.