diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index d29db2820b8b4..280381a7b6109 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -108,13 +108,9 @@ public boolean isSegmentsUploadBackpressureEnabled() { */ public void validateSegmentsUploadLag(ShardId shardId) { RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); - // This will be null for non-remote backed indexes - if (remoteRefreshSegmentTracker == null) { - return; - } - // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can - // increase the bytes to upload almost suddenly. - if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) { + // condition 1 - This will be null for non-remote backed indexes + // condition 2 - This will be zero if the remote store is + if (remoteRefreshSegmentTracker == null || remoteRefreshSegmentTracker.getRefreshSeqNoLag() == 0) { return; } @@ -189,6 +185,9 @@ private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.getRefreshSeqNoLag() <= 1) { + return true; + } if (pressureTracker.isUploadBytesAverageReady() == false) { logger.trace("upload bytes moving average is not ready"); return true; @@ -232,6 +231,9 @@ private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.getRefreshSeqNoLag() <= 1) { + return true; + } if (pressureTracker.isUploadTimeMsAverageReady() == false) { logger.trace("upload time moving average is not ready"); return true; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java index 3da21506d60e9..2a098b8f7a89b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java @@ -21,8 +21,8 @@ public class RemoteRefreshSegmentPressureSettings { private static class Defaults { - private static final double BYTES_LAG_VARIANCE_FACTOR = 2.0; - private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 2.0; + private static final double BYTES_LAG_VARIANCE_FACTOR = 10.0; + private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 10.0; private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1; diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 920607ffc8b16..1bab4bbfd9d31 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -111,11 +111,11 @@ public void testValidateSegmentUploadLag() { }); double avg = (double) sum.get() / 20; long currentMs = System.nanoTime() / 1_000_000; - pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg)); + pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg)); pressureTracker.updateRemoteRefreshTimeMs(currentMs); Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms")); + assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms")); pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); pressureService.validateSegmentsUploadLag(shardId); @@ -128,11 +128,11 @@ public void testValidateSegmentUploadLag() { }); avg = (double) sum.get() / 20; Map nameSizeMap = new HashMap<>(); - nameSizeMap.put("a", (long) (4 * avg)); + nameSizeMap.put("a", (long) (12 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0")); + assertTrue(e.getMessage().contains("bytes_lag:114 dynamic_bytes_lag_threshold:95.0")); nameSizeMap.put("a", (long) (2 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap);