Skip to content

Commit

Permalink
Increase the default limit for variance factor
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 19, 2023
1 parent dc378e8 commit 4803283
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,9 @@ public boolean isSegmentsUploadBackpressureEnabled() {
*/
public void validateSegmentsUploadLag(ShardId shardId) {
RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId);
// This will be null for non-remote backed indexes
if (remoteRefreshSegmentTracker == null) {
return;
}
// Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can
// increase the bytes to upload almost suddenly.
if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) {
// condition 1 - This will be null for non-remote backed indexes
// condition 2 - This will be zero if the remote store is
if (remoteRefreshSegmentTracker == null || remoteRefreshSegmentTracker.getRefreshSeqNoLag() == 0) {
return;
}

Expand Down Expand Up @@ -189,6 +185,9 @@ private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings)

@Override
public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) {
if (pressureTracker.getRefreshSeqNoLag() <= 1) {
return true;
}
if (pressureTracker.isUploadBytesAverageReady() == false) {
logger.trace("upload bytes moving average is not ready");
return true;
Expand Down Expand Up @@ -232,6 +231,9 @@ private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings)

@Override
public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) {
if (pressureTracker.getRefreshSeqNoLag() <= 1) {
return true;
}
if (pressureTracker.isUploadTimeMsAverageReady() == false) {
logger.trace("upload time moving average is not ready");
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
public class RemoteRefreshSegmentPressureSettings {

private static class Defaults {
private static final double BYTES_LAG_VARIANCE_FACTOR = 2.0;
private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 2.0;
private static final double BYTES_LAG_VARIANCE_FACTOR = 10.0;
private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 10.0;
private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0;
private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5;
private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ public void testValidateSegmentUploadLag() {
});
double avg = (double) sum.get() / 20;
long currentMs = System.nanoTime() / 1_000_000;
pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg));
pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg));
pressureTracker.updateRemoteRefreshTimeMs(currentMs);
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms"));
assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms"));

pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg));
pressureService.validateSegmentsUploadLag(shardId);
Expand All @@ -128,11 +128,11 @@ public void testValidateSegmentUploadLag() {
});
avg = (double) sum.get() / 20;
Map<String, Long> nameSizeMap = new HashMap<>();
nameSizeMap.put("a", (long) (4 * avg));
nameSizeMap.put("a", (long) (12 * avg));
pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap);
e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0"));
assertTrue(e.getMessage().contains("bytes_lag:114 dynamic_bytes_lag_threshold:95.0"));

nameSizeMap.put("a", (long) (2 * avg));
pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap);
Expand Down

0 comments on commit 4803283

Please sign in to comment.