Skip to content

Commit

Permalink
Compute lag early
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 25, 2023
1 parent 71cf169 commit c2de3ef
Showing 1 changed file with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public class RemoteSegmentUploadShardStatsTracker implements Writeable {

public static final int UPLOAD_TIME_WINDOW_SIZE = 2000;

private final AtomicLong seqNoLag = new AtomicLong();

private final AtomicLong timeMsLag = new AtomicLong();

private final AtomicLong localRefreshSeqNo = new AtomicLong();

private final AtomicLong localRefreshTime = new AtomicLong();
Expand Down Expand Up @@ -209,10 +213,12 @@ public long getLocalRefreshTime() {

public void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
this.localRefreshSeqNo.set(localRefreshSeqNo);
computeSeqNoLag();
}

public void updateLocalRefreshTime(long localRefreshTime) {
this.localRefreshTime.set(localRefreshTime);
computeTimeLag();
}

public long getRemoteRefreshSeqNo() {
Expand All @@ -221,6 +227,7 @@ public long getRemoteRefreshSeqNo() {

public void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
this.remoteRefreshSeqNo.set(remoteRefreshSeqNo);
computeSeqNoLag();
}

public long getRemoteRefreshTime() {
Expand All @@ -229,14 +236,23 @@ public long getRemoteRefreshTime() {

public void updateRemoteRefreshTime(long remoteRefreshTime) {
this.remoteRefreshTime.set(remoteRefreshTime);
computeTimeLag();
}

private void computeSeqNoLag() {
seqNoLag.set(localRefreshSeqNo.get() - remoteRefreshSeqNo.get());
}

public long getSeqNoLag() {
return localRefreshSeqNo.get() - remoteRefreshSeqNo.get();
return seqNoLag.get();
}

private void computeTimeLag() {
timeMsLag.set(localRefreshTime.get() - remoteRefreshTime.get());
}

public long getTimeLag() {
return localRefreshTime.get() - remoteRefreshTime.get();
return timeMsLag.get();
}

public Map<String, Long> getLatestLocalFileNameLengthMap() {
Expand All @@ -245,12 +261,16 @@ public Map<String, Long> getLatestLocalFileNameLengthMap() {

public void updateLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
computeBytesLag();
}

public void updateLatestUploadFiles(Set<String> latestUploadFiles) {
this.latestUploadFiles = latestUploadFiles;
computeBytesLag();
}

private final AtomicLong bytesLag = new AtomicLong();

public int getConsecutiveFailureCount() {
return failures.length();
}
Expand Down Expand Up @@ -291,15 +311,20 @@ public double getUploadTimeAverage() {
return uploadTimeMovingAverage.getAverage();
}

public long getBytesLag() {
private void computeBytesLag() {
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
return 0;
return;
}
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()
.stream()
.filter(f -> latestUploadFiles == null || latestUploadFiles.contains(f) == false)
.filter(f -> !latestUploadFiles.contains(f))
.collect(Collectors.toSet());
return filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
long bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
this.bytesLag.set(bytesLag);
}

public long getBytesLag() {
return bytesLag.get();
}

public long getInflightUploadBytes() {
Expand All @@ -324,5 +349,6 @@ public void updateUploadTimeMovingAverageWindowSize(int updatedSize) {

public void addToLatestUploadFiles(String file) {
this.latestUploadFiles.add(file);
computeBytesLag();
}
}

0 comments on commit c2de3ef

Please sign in to comment.