From 8002b76141630d903b52b113f5181a7adad20cc0 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 18 Apr 2023 16:25:19 +0530 Subject: [PATCH] Add remote refresh segment pressure service, settings and tracker --- .../RemoteRefreshSegmentPressureService.java | 198 ++++++++++ .../RemoteRefreshSegmentPressureSettings.java | 220 +++++++++++ .../RemoteRefreshSegmentPressureTracker.java | 353 ++++++++++++++++++ 3 files changed, 771 insertions(+) create mode 100644 server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureService.java create mode 100644 server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureSettings.java create mode 100644 server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureTracker.java diff --git a/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureService.java new file mode 100644 index 0000000000000..ab3584f3b8178 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureService.java @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; + +import java.util.Locale; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteRefreshSegmentPressureTracker}. + */ +public class RemoteRefreshSegmentPressureService implements IndexEventListener { + private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class); + + /** + * Keeps map of remote-backed index shards and their corresponding backpressure tracker. + */ + private final Map trackerMap = ConcurrentCollections.newConcurrentMap(); + + /** + * Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection. + */ + private final RemoteRefreshSegmentPressureSettings pressureSettings; + + @Inject + public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { + pressureSettings = new RemoteRefreshSegmentPressureSettings(clusterService, settings, this); + } + + /** + * Get {@code RemoteRefreshSegmentPressureTracker} only if the underlying Index has remote segments integration enabled. + * + * @param shardId shard id + * @return the tracker if index is remote-backed, else null. + */ + public RemoteRefreshSegmentPressureTracker getStatsTracker(ShardId shardId) { + return trackerMap.get(shardId); + } + + @Override + public void afterIndexShardCreated(IndexShard indexShard) { + if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { + return; + } + ShardId shardId = indexShard.shardId(); + trackerMap.put(shardId, new RemoteRefreshSegmentPressureTracker(shardId, pressureSettings)); + logger.trace("Created tracker for shardId={}", shardId); + } + + @Override + public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { + if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { + return; + } + trackerMap.remove(shardId); + logger.trace("Deleted tracker for shardId={}", shardId); + } + + /** + * Check if remote refresh segments backpressure is enabled. This is backed by a cluster level setting. + * + * @return true if enabled, else false. + */ + public boolean isSegmentsUploadBackpressureEnabled() { + return pressureSettings.isRemoteRefreshSegmentPressureEnabled(); + } + + public void validateSegmentsUploadLag(ShardId shardId) { + RemoteRefreshSegmentPressureTracker pressureTracker = getStatsTracker(shardId); + // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can + // increase the bytes to upload almost suddenly. + if (pressureTracker.getSeqNoLag() <= 2) { + return; + } + + // Check if the remote store seq no lag is above the min seq no lag limit + validateSeqNoLag(pressureTracker, shardId); + + // Check if the remote store is lagging more than the upload bytes average multiplied by a variance factor + validateBytesLag(pressureTracker, shardId); + + // Check if the remote store is lagging more than the upload time average multiplied by a variance factor + validateTimeLag(pressureTracker, shardId); + + // Check if consecutive failure limit has been breached + validateConsecutiveFailureLimitBreached(pressureTracker, shardId); + } + + private void validateSeqNoLag(RemoteRefreshSegmentPressureTracker pressureTracker, ShardId shardId) { + // Check if the remote store seq no lag is above the min seq no lag limit + if (pressureTracker.getSeqNoLag() > pressureSettings.getMinSeqNoLagLimit()) { + pressureTracker.incrementRejectionCount(); + throw new OpenSearchRejectedExecutionException( + String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "remote_refresh_seq_no:%s local_refresh_seq_no:%s", + shardId, + pressureTracker.getRemoteRefreshSeqNo(), + pressureTracker.getLocalRefreshSeqNo() + ) + ); + } + } + + private void validateBytesLag(RemoteRefreshSegmentPressureTracker pressureTracker, ShardId shardId) { + if (pressureTracker.isUploadBytesAverageReady() == false) { + return; + } + double dynamicBytesLagThreshold = pressureTracker.getUploadBytesAverage() * pressureSettings.getBytesLagVarianceThreshold(); + long bytesLag = pressureTracker.getBytesLag(); + if (bytesLag > dynamicBytesLagThreshold) { + pressureTracker.incrementRejectionCount(); + throw new OpenSearchRejectedExecutionException( + String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "bytes_lag:%s dynamic_bytes_lag_threshold:%s", + shardId, + bytesLag, + dynamicBytesLagThreshold + ) + ); + } + } + + private void validateTimeLag(RemoteRefreshSegmentPressureTracker pressureTracker, ShardId shardId) { + if (pressureTracker.isUploadTimeMsAverageReady() == false) { + return; + } + long timeLag = pressureTracker.getTimeMsLag(); + double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMsAverage() * pressureSettings.getTimeLagVarianceThreshold(); + if (timeLag > dynamicTimeLagThreshold) { + pressureTracker.incrementRejectionCount(); + throw new OpenSearchRejectedExecutionException( + String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "time_lag:%s ns dynamic_time_lag_threshold:%s ns", + shardId, + timeLag, + dynamicTimeLagThreshold + ) + ); + } + } + + private void validateConsecutiveFailureLimitBreached(RemoteRefreshSegmentPressureTracker pressureTracker, ShardId shardId) { + int failureStreakCount = pressureTracker.getConsecutiveFailureCount(); + int minConsecutiveFailureThreshold = pressureSettings.getMinConsecutiveFailuresLimit(); + if (failureStreakCount > minConsecutiveFailureThreshold) { + pressureTracker.incrementRejectionCount(); + throw new OpenSearchRejectedExecutionException( + String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "failure_streak_count:%s min_consecutive_failure_threshold:%s", + shardId, + failureStreakCount, + minConsecutiveFailureThreshold + ) + ); + } + } + + void updateUploadBytesMovingAverageWindowSize(int updatedSize) { + updateMovingAverageWindowSize(RemoteRefreshSegmentPressureTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); + } + + void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { + updateMovingAverageWindowSize(RemoteRefreshSegmentPressureTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); + } + + void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { + updateMovingAverageWindowSize(RemoteRefreshSegmentPressureTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); + } + + void updateMovingAverageWindowSize(BiConsumer biConsumer, int updatedSize) { + trackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); + } +} diff --git a/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureSettings.java new file mode 100644 index 0000000000000..3039c7cae9413 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureSettings.java @@ -0,0 +1,220 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +/** + * Settings related to back pressure on account of segments upload failures / lags. + */ +public class RemoteRefreshSegmentPressureSettings { + + private static class Defaults { + private static final long MIN_SEQ_NO_LAG_LIMIT = 5; + private static final double BYTES_LAG_VARIANCE_THRESHOLD = 2.0; + private static final double TIME_LAG_VARIANCE_THRESHOLD = 2.0; + private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 10; + private static final int UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = 20; + private static final int UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = 20; + private static final int UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE = 20; + } + + public static final Setting REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED = Setting.boolSetting( + "remote_store.segment.pressure.enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MIN_SEQ_NO_LAG_LIMIT = Setting.longSetting( + "remote_store.segment.pressure.seq_no_lag.limit", + Defaults.MIN_SEQ_NO_LAG_LIMIT, + 2L, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting BYTES_LAG_VARIANCE_THRESHOLD = Setting.doubleSetting( + "remote_store.segment.pressure.bytes_lag.variance", + Defaults.BYTES_LAG_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting TIME_LAG_VARIANCE_THRESHOLD = Setting.doubleSetting( + "remote_store.segment.pressure.time_lag.variance", + Defaults.TIME_LAG_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MIN_CONSECUTIVE_FAILURES_LIMIT = Setting.intSetting( + "remote_store.segment.pressure.consecutive_failures.limit", + Defaults.MIN_CONSECUTIVE_FAILURES_LIMIT, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.segment.pressure.upload_bytes_moving_average_window_size", + Defaults.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.segment.pressure.upload_bytes_per_sec_moving_average_window_size", + Defaults.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.segment.pressure.upload_time_moving_average_window_size", + Defaults.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile boolean remoteRefreshSegmentPressureEnabled; + + private volatile long minSeqNoLagLimit; + + private volatile double bytesLagVarianceThreshold; + + private volatile double timeLagVarianceThreshold; + + private volatile int minConsecutiveFailuresLimit; + + private volatile int uploadBytesMovingAverageWindowSize; + + private volatile int uploadBytesPerSecMovingAverageWindowSize; + + private volatile int uploadTimeMovingAverageWindowSize; + + public RemoteRefreshSegmentPressureSettings( + ClusterService clusterService, + Settings settings, + RemoteRefreshSegmentPressureService remoteUploadPressureService + ) { + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + + this.remoteRefreshSegmentPressureEnabled = REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, this::setRemoteRefreshSegmentPressureEnabled); + + this.minSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinSeqNoLagLimit); + + this.bytesLagVarianceThreshold = BYTES_LAG_VARIANCE_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(BYTES_LAG_VARIANCE_THRESHOLD, this::setBytesLagVarianceThreshold); + + this.timeLagVarianceThreshold = TIME_LAG_VARIANCE_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(TIME_LAG_VARIANCE_THRESHOLD, this::setTimeLagVarianceThreshold); + + this.minConsecutiveFailuresLimit = MIN_CONSECUTIVE_FAILURES_LIMIT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MIN_CONSECUTIVE_FAILURES_LIMIT, this::setMinConsecutiveFailuresLimit); + + this.uploadBytesMovingAverageWindowSize = UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, + remoteUploadPressureService::updateUploadBytesMovingAverageWindowSize + ); + clusterSettings.addSettingsUpdateConsumer(UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, this::setUploadBytesMovingAverageWindowSize); + + this.uploadBytesPerSecMovingAverageWindowSize = UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + remoteUploadPressureService::updateUploadBytesPerSecMovingAverageWindowSize + ); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + this::setUploadBytesPerSecMovingAverageWindowSize + ); + + this.uploadTimeMovingAverageWindowSize = UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, + remoteUploadPressureService::updateUploadTimeMsMovingAverageWindowSize + ); + clusterSettings.addSettingsUpdateConsumer(UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, this::setUploadTimeMovingAverageWindowSize); + } + + public boolean isRemoteRefreshSegmentPressureEnabled() { + return remoteRefreshSegmentPressureEnabled; + } + + public void setRemoteRefreshSegmentPressureEnabled(boolean remoteRefreshSegmentPressureEnabled) { + this.remoteRefreshSegmentPressureEnabled = remoteRefreshSegmentPressureEnabled; + } + + public long getMinSeqNoLagLimit() { + return minSeqNoLagLimit; + } + + public void setMinSeqNoLagLimit(long minSeqNoLagLimit) { + this.minSeqNoLagLimit = minSeqNoLagLimit; + } + + public double getBytesLagVarianceThreshold() { + return bytesLagVarianceThreshold; + } + + public void setBytesLagVarianceThreshold(double bytesLagVarianceThreshold) { + this.bytesLagVarianceThreshold = bytesLagVarianceThreshold; + } + + public double getTimeLagVarianceThreshold() { + return timeLagVarianceThreshold; + } + + public void setTimeLagVarianceThreshold(double timeLagVarianceThreshold) { + this.timeLagVarianceThreshold = timeLagVarianceThreshold; + } + + public int getMinConsecutiveFailuresLimit() { + return minConsecutiveFailuresLimit; + } + + public void setMinConsecutiveFailuresLimit(int minConsecutiveFailuresLimit) { + this.minConsecutiveFailuresLimit = minConsecutiveFailuresLimit; + } + + public int getUploadBytesMovingAverageWindowSize() { + return uploadBytesMovingAverageWindowSize; + } + + public void setUploadBytesMovingAverageWindowSize(int uploadBytesMovingAverageWindowSize) { + this.uploadBytesMovingAverageWindowSize = uploadBytesMovingAverageWindowSize; + } + + public int getUploadBytesPerSecMovingAverageWindowSize() { + return uploadBytesPerSecMovingAverageWindowSize; + } + + public void setUploadBytesPerSecMovingAverageWindowSize(int uploadBytesPerSecMovingAverageWindowSize) { + this.uploadBytesPerSecMovingAverageWindowSize = uploadBytesPerSecMovingAverageWindowSize; + } + + public int getUploadTimeMovingAverageWindowSize() { + return uploadTimeMovingAverageWindowSize; + } + + public void setUploadTimeMovingAverageWindowSize(int uploadTimeMovingAverageWindowSize) { + this.uploadTimeMovingAverageWindowSize = uploadTimeMovingAverageWindowSize; + } +} diff --git a/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureTracker.java b/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureTracker.java new file mode 100644 index 0000000000000..3fc6e6ce41e29 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/RemoteRefreshSegmentPressureTracker.java @@ -0,0 +1,353 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.util.MovingAverage; +import org.opensearch.common.util.Streak; +import org.opensearch.index.shard.ShardId; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics. + */ +public class RemoteRefreshSegmentPressureTracker { + + RemoteRefreshSegmentPressureTracker(ShardId shardId, RemoteRefreshSegmentPressureSettings remoteUploadPressureSettings) { + this.shardId = shardId; + // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises. + long currentTimeMs = System.nanoTime() / 1_000_000L; + localRefreshTimeMs.set(currentTimeMs); + remoteRefreshTimeMs.set(currentTimeMs); + uploadBytesMovingAverageReference = new AtomicReference<>( + new MovingAverage(remoteUploadPressureSettings.getUploadBytesMovingAverageWindowSize()) + ); + uploadBytesPerSecMovingAverageReference = new AtomicReference<>( + new MovingAverage(remoteUploadPressureSettings.getUploadBytesPerSecMovingAverageWindowSize()) + ); + uploadTimeMsMovingAverageReference = new AtomicReference<>( + new MovingAverage(remoteUploadPressureSettings.getUploadTimeMovingAverageWindowSize()) + ); + } + + /** + * ShardId for which this instance tracks the remote segment upload metadata. + */ + private final ShardId shardId; + + /** + * Every refresh is assigned a sequence number. This is the sequence number of the most recent refresh. + */ + private final AtomicLong localRefreshSeqNo = new AtomicLong(); + + /** + * The refresh time of the most recent refresh. + */ + private final AtomicLong localRefreshTimeMs = new AtomicLong(); + + /** + * Sequence number of the most recent remote refresh. + */ + private final AtomicLong remoteRefreshSeqNo = new AtomicLong(); + + /** + * The refresh time of most recent remote refresh. + */ + private final AtomicLong remoteRefreshTimeMs = new AtomicLong(); + + /** + * Keeps the seq no lag computed so that we do not compute it for every request. + */ + private final AtomicLong seqNoLag = new AtomicLong(); + + /** + * Keeps the time (ms) lag computed so that we do not compute it for every request. + */ + private final AtomicLong timeMsLag = new AtomicLong(); + + /** + * Cumulative sum of size in bytes of segment files for which upload has started during remote refresh. + */ + private final AtomicLong uploadBytesStarted = new AtomicLong(); + + /** + * Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh. + */ + private final AtomicLong uploadBytesFailed = new AtomicLong(); + + /** + * Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh. + */ + private final AtomicLong uploadBytesSucceeded = new AtomicLong(); + + /** + * Cumulative sum of count of remote refreshes that have started. + */ + private final AtomicLong totalUploadsStarted = new AtomicLong(); + + /** + * Cumulative sum of count of remote refreshes that have failed. + */ + private final AtomicLong totalUploadsFailed = new AtomicLong(); + + /** + * Cumulative sum of count of remote refreshes that have succeeded. + */ + private final AtomicLong totalUploadsSucceeded = new AtomicLong(); + + /** + * Cumulative sum of rejection counts for this shard. + */ + private final AtomicLong rejectionCount = new AtomicLong(); + + /** + * Map of name to size of the segment files created as part of the most recent refresh. + */ + private volatile Map latestLocalFileNameLengthMap; + + /** + * Set of names of segment files that were uploaded as part of the most recent remote refresh. + */ + private volatile Set latestUploadFiles; + + /** + * Keeps the bytes lag computed so that we do not compute it for every request. + */ + private final AtomicLong bytesLag = new AtomicLong(); + + /** + * Holds count of consecutive failures until last success. Gets reset to zero if there is a success. + */ + private final Streak failures = new Streak(); + + /** + * Provides moving average over the last N total size in bytes of segment files uploaded as part of remote refresh. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadBytesMovingAverageReference; + + /** + * Provides moving average over the last N upload speed (in bytes/s) of segment files uploaded as part of remote refresh. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadBytesPerSecMovingAverageReference; + + /** + * Provides moving average over the last N overall upload time (in nanos) as part of remote refresh.N is window size. + * Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadTimeMsMovingAverageReference; + + ShardId getShardId() { + return shardId; + } + + AtomicLong getLocalRefreshSeqNo() { + return localRefreshSeqNo; + } + + void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + this.localRefreshSeqNo.set(localRefreshSeqNo); + computeSeqNoLag(); + } + + AtomicLong getLocalRefreshTimeMs() { + return localRefreshTimeMs; + } + + void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + this.localRefreshTimeMs.set(localRefreshTimeMs); + computeTimeMsLag(); + } + + AtomicLong getRemoteRefreshSeqNo() { + return remoteRefreshSeqNo; + } + + void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { + this.remoteRefreshSeqNo.set(remoteRefreshSeqNo); + computeSeqNoLag(); + } + + AtomicLong getRemoteRefreshTimeMs() { + return remoteRefreshTimeMs; + } + + void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { + this.remoteRefreshTimeMs.set(remoteRefreshTimeMs); + computeTimeMsLag(); + } + + private void computeSeqNoLag() { + seqNoLag.set(localRefreshSeqNo.get() - remoteRefreshSeqNo.get()); + } + + long getSeqNoLag() { + return seqNoLag.get(); + } + + private void computeTimeMsLag() { + timeMsLag.set(localRefreshTimeMs.get() - remoteRefreshTimeMs.get()); + } + + long getTimeMsLag() { + return timeMsLag.get(); + } + + long getBytesLag() { + return bytesLag.get(); + } + + AtomicLong getUploadBytesStarted() { + return uploadBytesStarted; + } + + void addUploadBytesStarted(long size) { + uploadBytesStarted.addAndGet(size); + } + + AtomicLong getUploadBytesFailed() { + return uploadBytesFailed; + } + + void addUploadBytesFailed(long size) { + uploadBytesFailed.addAndGet(size); + } + + AtomicLong getUploadBytesSucceeded() { + return uploadBytesSucceeded; + } + + void addUploadBytesSucceeded(long size) { + uploadBytesSucceeded.addAndGet(size); + } + + long getInflightUploadBytes() { + return uploadBytesStarted.get() - uploadBytesFailed.get() - uploadBytesSucceeded.get(); + } + + AtomicLong getTotalUploadsStarted() { + return totalUploadsStarted; + } + + void incrementTotalUploadsStarted() { + totalUploadsStarted.incrementAndGet(); + } + + public AtomicLong getTotalUploadsFailed() { + return totalUploadsFailed; + } + + void incrementTotalUploadsFailed() { + totalUploadsFailed.incrementAndGet(); + failures.record(true); + } + + AtomicLong getTotalUploadsSucceeded() { + return totalUploadsSucceeded; + } + + void incrementTotalUploadSucceeded() { + totalUploadsSucceeded.incrementAndGet(); + failures.record(false); + } + + long getInflightUploads() { + return totalUploadsStarted.get() - totalUploadsFailed.get() - totalUploadsSucceeded.get(); + } + + AtomicLong getRejectionCount() { + return rejectionCount; + } + + void incrementRejectionCount() { + rejectionCount.incrementAndGet(); + } + + Map getLatestLocalFileNameLengthMap() { + return latestLocalFileNameLengthMap; + } + + void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { + this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; + computeBytesLag(); + } + + void addToLatestUploadFiles(String file) { + this.latestUploadFiles.add(file); + computeBytesLag(); + } + + private void computeBytesLag() { + if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) { + return; + } + Set filesNotYetUploaded = latestLocalFileNameLengthMap.keySet() + .stream() + .filter(f -> latestUploadFiles == null || latestUploadFiles.contains(f) == false) + .collect(Collectors.toSet()); + long bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum(); + this.bytesLag.set(bytesLag); + } + + int getConsecutiveFailureCount() { + return failures.length(); + } + + boolean isUploadBytesAverageReady() { + return uploadBytesMovingAverageReference.get().isReady(); + } + + double getUploadBytesAverage() { + return uploadBytesMovingAverageReference.get().getAverage(); + } + + /** + * Updates the window size for data collection of upload bytes. This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadBytesMovingAverageWindowSize(int updatedSize) { + this.uploadBytesMovingAverageReference.set(new MovingAverage(updatedSize)); + } + + double getUploadBytesPerSecAverage() { + return uploadBytesPerSecMovingAverageReference.get().getAverage(); + } + + /** + * Updates the window size for data collection of upload bytes per second. This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { + this.uploadBytesPerSecMovingAverageReference.set(new MovingAverage(updatedSize)); + } + + boolean isUploadTimeMsAverageReady() { + return uploadTimeMsMovingAverageReference.get().isReady(); + } + + double getUploadTimeMsAverage() { + return uploadTimeMsMovingAverageReference.get().getAverage(); + } + + /** + * Updates the window size for data collection of upload time (ms). This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { + this.uploadTimeMsMovingAverageReference.set(new MovingAverage(updatedSize)); + } +}