Skip to content

Commit

Permalink
Incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 27, 2023
1 parent cdfb8e0 commit 327cce8
Showing 1 changed file with 56 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.ShardId;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,62 +35,62 @@ public class RemoteRefreshSegmentTracker {
/**
* Every refresh is assigned a sequence number. This is the sequence number of the most recent refresh.
*/
private final AtomicLong localRefreshSeqNo = new AtomicLong();
private volatile long localRefreshSeqNo;

/**
* The refresh time of the most recent refresh.
*/
private final AtomicLong localRefreshTimeMs = new AtomicLong();
private volatile long localRefreshTimeMs;

/**
* Sequence number of the most recent remote refresh.
*/
private final AtomicLong remoteRefreshSeqNo = new AtomicLong();
private volatile long remoteRefreshSeqNo;

/**
* The refresh time of most recent remote refresh.
*/
private final AtomicLong remoteRefreshTimeMs = new AtomicLong();
private volatile long remoteRefreshTimeMs;

/**
* Keeps the seq no lag computed so that we do not compute it for every request.
*/
private final AtomicLong seqNoLag = new AtomicLong();
private volatile long seqNoLag;

/**
* Keeps the time (ms) lag computed so that we do not compute it for every request.
*/
private final AtomicLong timeMsLag = new AtomicLong();
private volatile long timeMsLag;

/**
* Cumulative sum of size in bytes of segment files for which upload has started during remote refresh.
*/
private final AtomicLong uploadBytesStarted = new AtomicLong();
private volatile long uploadBytesStarted;

/**
* Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh.
*/
private final AtomicLong uploadBytesFailed = new AtomicLong();
private volatile long uploadBytesFailed;

/**
* Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh.
*/
private final AtomicLong uploadBytesSucceeded = new AtomicLong();
private volatile long uploadBytesSucceeded;

/**
* Cumulative sum of count of remote refreshes that have started.
*/
private final AtomicLong totalUploadsStarted = new AtomicLong();
private volatile long totalUploadsStarted;

/**
* Cumulative sum of count of remote refreshes that have failed.
*/
private final AtomicLong totalUploadsFailed = new AtomicLong();
private volatile long totalUploadsFailed;

/**
* Cumulative sum of count of remote refreshes that have succeeded.
*/
private final AtomicLong totalUploadsSucceeded = new AtomicLong();
private volatile long totalUploadsSucceeded;

/**
* Cumulative sum of rejection counts for this shard.
Expand All @@ -109,12 +110,12 @@ public class RemoteRefreshSegmentTracker {
/**
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
*/
private final Set<String> latestUploadFiles = ConcurrentCollections.newConcurrentSet();
private final Set<String> latestUploadFiles = new HashSet<>();

/**
* Keeps the bytes lag computed so that we do not compute it for every request.
*/
private final AtomicLong bytesLag = new AtomicLong();
private volatile long bytesLag;

/**
* Holds count of consecutive failures until last success. Gets reset to zero if there is a success.
Expand Down Expand Up @@ -157,149 +158,149 @@ public RemoteRefreshSegmentTracker(
this.shardId = shardId;
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentTimeMs = System.nanoTime() / 1_000_000L;
localRefreshTimeMs.set(currentTimeMs);
remoteRefreshTimeMs.set(currentTimeMs);
this.uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
this.uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
this.uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));
localRefreshTimeMs = currentTimeMs;
remoteRefreshTimeMs = currentTimeMs;
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));
}

ShardId getShardId() {
return shardId;
}

long getLocalRefreshSeqNo() {
return localRefreshSeqNo.get();
return localRefreshSeqNo;
}

void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
assert localRefreshSeqNo > this.localRefreshSeqNo.get() : "newLocalRefreshSeqNo="
assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo="
+ localRefreshSeqNo
+ ">="
+ "currentLocalRefreshSeqNo="
+ this.localRefreshSeqNo.get();
this.localRefreshSeqNo.set(localRefreshSeqNo);
+ this.localRefreshSeqNo;
this.localRefreshSeqNo = localRefreshSeqNo;
computeSeqNoLag();
}

long getLocalRefreshTimeMs() {
return localRefreshTimeMs.get();
return localRefreshTimeMs;
}

void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs > this.localRefreshTimeMs.get() : "newLocalRefreshTimeMs="
assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ ">="
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs.get();
this.localRefreshTimeMs.set(localRefreshTimeMs);
+ this.localRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
computeTimeMsLag();
}

long getRemoteRefreshSeqNo() {
return remoteRefreshSeqNo.get();
return remoteRefreshSeqNo;
}

void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
assert remoteRefreshSeqNo > this.remoteRefreshSeqNo.get() : "newRemoteRefreshSeqNo="
assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo="
+ remoteRefreshSeqNo
+ ">="
+ "currentRemoteRefreshSeqNo="
+ this.remoteRefreshSeqNo.get();
this.remoteRefreshSeqNo.set(remoteRefreshSeqNo);
+ this.remoteRefreshSeqNo;
this.remoteRefreshSeqNo = remoteRefreshSeqNo;
computeSeqNoLag();
}

long getRemoteRefreshTimeMs() {
return remoteRefreshTimeMs.get();
return remoteRefreshTimeMs;
}

void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs > this.remoteRefreshTimeMs.get() : "newRemoteRefreshTimeMs="
assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ remoteRefreshTimeMs
+ ">="
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs.get();
this.remoteRefreshTimeMs.set(remoteRefreshTimeMs);
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
computeTimeMsLag();
}

private void computeSeqNoLag() {
seqNoLag.set(localRefreshSeqNo.get() - remoteRefreshSeqNo.get());
seqNoLag = localRefreshSeqNo - remoteRefreshSeqNo;
}

long getSeqNoLag() {
return seqNoLag.get();
return seqNoLag;
}

private void computeTimeMsLag() {
timeMsLag.set(localRefreshTimeMs.get() - remoteRefreshTimeMs.get());
timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs;
}

long getTimeMsLag() {
return timeMsLag.get();
return timeMsLag;
}

long getBytesLag() {
return bytesLag.get();
return bytesLag;
}

long getUploadBytesStarted() {
return uploadBytesStarted.get();
return uploadBytesStarted;
}

void addUploadBytesStarted(long size) {
uploadBytesStarted.addAndGet(size);
uploadBytesStarted += size;
}

long getUploadBytesFailed() {
return uploadBytesFailed.get();
return uploadBytesFailed;
}

void addUploadBytesFailed(long size) {
uploadBytesFailed.addAndGet(size);
uploadBytesFailed += size;
}

long getUploadBytesSucceeded() {
return uploadBytesSucceeded.get();
return uploadBytesSucceeded;
}

void addUploadBytesSucceeded(long size) {
uploadBytesSucceeded.addAndGet(size);
uploadBytesSucceeded += size;
}

long getInflightUploadBytes() {
return uploadBytesStarted.get() - uploadBytesFailed.get() - uploadBytesSucceeded.get();
return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded;
}

long getTotalUploadsStarted() {
return totalUploadsStarted.get();
return totalUploadsStarted;
}

void incrementTotalUploadsStarted() {
totalUploadsStarted.incrementAndGet();
totalUploadsStarted += 1;
}

long getTotalUploadsFailed() {
return totalUploadsFailed.get();
return totalUploadsFailed;
}

void incrementTotalUploadsFailed() {
totalUploadsFailed.incrementAndGet();
totalUploadsFailed += 1;
failures.record(true);
}

long getTotalUploadsSucceeded() {
return totalUploadsSucceeded.get();
return totalUploadsSucceeded;
}

void incrementTotalUploadSucceeded() {
totalUploadsSucceeded.incrementAndGet();
totalUploadsSucceeded += 1;
failures.record(false);
}

long getInflightUploads() {
return totalUploadsStarted.get() - totalUploadsFailed.get() - totalUploadsSucceeded.get();
return totalUploadsStarted - totalUploadsFailed - totalUploadsSucceeded;
}

long getRejectionCount() {
Expand Down Expand Up @@ -340,8 +341,7 @@ private void computeBytesLag() {
.stream()
.filter(f -> !latestUploadFiles.contains(f))
.collect(Collectors.toSet());
long bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
this.bytesLag.set(bytesLag);
this.bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum();
}

int getConsecutiveFailureCount() {
Expand Down

0 comments on commit 327cce8

Please sign in to comment.