diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index a331046a71983..50a9524291b4a 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -13,6 +13,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.shard.ShardId; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -34,62 +35,62 @@ public class RemoteRefreshSegmentTracker { /** * Every refresh is assigned a sequence number. This is the sequence number of the most recent refresh. */ - private final AtomicLong localRefreshSeqNo = new AtomicLong(); + private volatile long localRefreshSeqNo; /** * The refresh time of the most recent refresh. */ - private final AtomicLong localRefreshTimeMs = new AtomicLong(); + private volatile long localRefreshTimeMs; /** * Sequence number of the most recent remote refresh. */ - private final AtomicLong remoteRefreshSeqNo = new AtomicLong(); + private volatile long remoteRefreshSeqNo; /** * The refresh time of most recent remote refresh. */ - private final AtomicLong remoteRefreshTimeMs = new AtomicLong(); + private volatile long remoteRefreshTimeMs; /** * Keeps the seq no lag computed so that we do not compute it for every request. */ - private final AtomicLong seqNoLag = new AtomicLong(); + private volatile long seqNoLag; /** * Keeps the time (ms) lag computed so that we do not compute it for every request. */ - private final AtomicLong timeMsLag = new AtomicLong(); + private volatile long timeMsLag; /** * Cumulative sum of size in bytes of segment files for which upload has started during remote refresh. */ - private final AtomicLong uploadBytesStarted = new AtomicLong(); + private volatile long uploadBytesStarted; /** * Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh. */ - private final AtomicLong uploadBytesFailed = new AtomicLong(); + private volatile long uploadBytesFailed; /** * Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh. */ - private final AtomicLong uploadBytesSucceeded = new AtomicLong(); + private volatile long uploadBytesSucceeded; /** * Cumulative sum of count of remote refreshes that have started. */ - private final AtomicLong totalUploadsStarted = new AtomicLong(); + private volatile long totalUploadsStarted; /** * Cumulative sum of count of remote refreshes that have failed. */ - private final AtomicLong totalUploadsFailed = new AtomicLong(); + private volatile long totalUploadsFailed; /** * Cumulative sum of count of remote refreshes that have succeeded. */ - private final AtomicLong totalUploadsSucceeded = new AtomicLong(); + private volatile long totalUploadsSucceeded; /** * Cumulative sum of rejection counts for this shard. @@ -109,12 +110,12 @@ public class RemoteRefreshSegmentTracker { /** * Set of names of segment files that were uploaded as part of the most recent remote refresh. */ - private final Set latestUploadFiles = ConcurrentCollections.newConcurrentSet(); + private final Set latestUploadFiles = new HashSet<>(); /** * Keeps the bytes lag computed so that we do not compute it for every request. */ - private final AtomicLong bytesLag = new AtomicLong(); + private volatile long bytesLag; /** * Holds count of consecutive failures until last success. Gets reset to zero if there is a success. @@ -157,11 +158,11 @@ public RemoteRefreshSegmentTracker( this.shardId = shardId; // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises. long currentTimeMs = System.nanoTime() / 1_000_000L; - localRefreshTimeMs.set(currentTimeMs); - remoteRefreshTimeMs.set(currentTimeMs); - this.uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); - this.uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); - this.uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); + localRefreshTimeMs = currentTimeMs; + remoteRefreshTimeMs = currentTimeMs; + uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); + uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); + uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); } ShardId getShardId() { @@ -169,137 +170,137 @@ ShardId getShardId() { } long getLocalRefreshSeqNo() { - return localRefreshSeqNo.get(); + return localRefreshSeqNo; } void updateLocalRefreshSeqNo(long localRefreshSeqNo) { - assert localRefreshSeqNo > this.localRefreshSeqNo.get() : "newLocalRefreshSeqNo=" + assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + localRefreshSeqNo + ">=" + "currentLocalRefreshSeqNo=" - + this.localRefreshSeqNo.get(); - this.localRefreshSeqNo.set(localRefreshSeqNo); + + this.localRefreshSeqNo; + this.localRefreshSeqNo = localRefreshSeqNo; computeSeqNoLag(); } long getLocalRefreshTimeMs() { - return localRefreshTimeMs.get(); + return localRefreshTimeMs; } void updateLocalRefreshTimeMs(long localRefreshTimeMs) { - assert localRefreshTimeMs > this.localRefreshTimeMs.get() : "newLocalRefreshTimeMs=" + assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + localRefreshTimeMs + ">=" + "currentLocalRefreshTimeMs=" - + this.localRefreshTimeMs.get(); - this.localRefreshTimeMs.set(localRefreshTimeMs); + + this.localRefreshTimeMs; + this.localRefreshTimeMs = localRefreshTimeMs; computeTimeMsLag(); } long getRemoteRefreshSeqNo() { - return remoteRefreshSeqNo.get(); + return remoteRefreshSeqNo; } void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { - assert remoteRefreshSeqNo > this.remoteRefreshSeqNo.get() : "newRemoteRefreshSeqNo=" + assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + remoteRefreshSeqNo + ">=" + "currentRemoteRefreshSeqNo=" - + this.remoteRefreshSeqNo.get(); - this.remoteRefreshSeqNo.set(remoteRefreshSeqNo); + + this.remoteRefreshSeqNo; + this.remoteRefreshSeqNo = remoteRefreshSeqNo; computeSeqNoLag(); } long getRemoteRefreshTimeMs() { - return remoteRefreshTimeMs.get(); + return remoteRefreshTimeMs; } void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { - assert remoteRefreshTimeMs > this.remoteRefreshTimeMs.get() : "newRemoteRefreshTimeMs=" + assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + remoteRefreshTimeMs + ">=" + "currentRemoteRefreshTimeMs=" - + this.remoteRefreshTimeMs.get(); - this.remoteRefreshTimeMs.set(remoteRefreshTimeMs); + + this.remoteRefreshTimeMs; + this.remoteRefreshTimeMs = remoteRefreshTimeMs; computeTimeMsLag(); } private void computeSeqNoLag() { - seqNoLag.set(localRefreshSeqNo.get() - remoteRefreshSeqNo.get()); + seqNoLag = localRefreshSeqNo - remoteRefreshSeqNo; } long getSeqNoLag() { - return seqNoLag.get(); + return seqNoLag; } private void computeTimeMsLag() { - timeMsLag.set(localRefreshTimeMs.get() - remoteRefreshTimeMs.get()); + timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs; } long getTimeMsLag() { - return timeMsLag.get(); + return timeMsLag; } long getBytesLag() { - return bytesLag.get(); + return bytesLag; } long getUploadBytesStarted() { - return uploadBytesStarted.get(); + return uploadBytesStarted; } void addUploadBytesStarted(long size) { - uploadBytesStarted.addAndGet(size); + uploadBytesStarted += size; } long getUploadBytesFailed() { - return uploadBytesFailed.get(); + return uploadBytesFailed; } void addUploadBytesFailed(long size) { - uploadBytesFailed.addAndGet(size); + uploadBytesFailed += size; } long getUploadBytesSucceeded() { - return uploadBytesSucceeded.get(); + return uploadBytesSucceeded; } void addUploadBytesSucceeded(long size) { - uploadBytesSucceeded.addAndGet(size); + uploadBytesSucceeded += size; } long getInflightUploadBytes() { - return uploadBytesStarted.get() - uploadBytesFailed.get() - uploadBytesSucceeded.get(); + return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded; } long getTotalUploadsStarted() { - return totalUploadsStarted.get(); + return totalUploadsStarted; } void incrementTotalUploadsStarted() { - totalUploadsStarted.incrementAndGet(); + totalUploadsStarted += 1; } long getTotalUploadsFailed() { - return totalUploadsFailed.get(); + return totalUploadsFailed; } void incrementTotalUploadsFailed() { - totalUploadsFailed.incrementAndGet(); + totalUploadsFailed += 1; failures.record(true); } long getTotalUploadsSucceeded() { - return totalUploadsSucceeded.get(); + return totalUploadsSucceeded; } void incrementTotalUploadSucceeded() { - totalUploadsSucceeded.incrementAndGet(); + totalUploadsSucceeded += 1; failures.record(false); } long getInflightUploads() { - return totalUploadsStarted.get() - totalUploadsFailed.get() - totalUploadsSucceeded.get(); + return totalUploadsStarted - totalUploadsFailed - totalUploadsSucceeded; } long getRejectionCount() { @@ -340,8 +341,7 @@ private void computeBytesLag() { .stream() .filter(f -> !latestUploadFiles.contains(f)) .collect(Collectors.toSet()); - long bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum(); - this.bytesLag.set(bytesLag); + this.bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum(); } int getConsecutiveFailureCount() {