From c82ffb775b582a53b6879f57e3c5da3d6e95113f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 2 May 2023 16:31:07 +0530 Subject: [PATCH] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../RemoteRefreshSegmentPressureService.java | 36 ++++++++++++++++--- .../RemoteRefreshSegmentPressureSettings.java | 14 ++++---- .../remote/RemoteRefreshSegmentTracker.java | 14 ++++---- ...teRefreshSegmentPressureSettingsTests.java | 8 ++--- .../RemoteRefreshSegmentTrackerTests.java | 4 +-- 5 files changed, 51 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index 02b8aacd184ee..37935cc0eb29d 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -107,7 +107,7 @@ public void validateSegmentsUploadLag(ShardId shardId) { RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can // increase the bytes to upload almost suddenly. - if (remoteRefreshSegmentTracker.getSeqNoLag() <= 1) { + if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) { return; } @@ -162,9 +162,7 @@ private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { * * @return the name using class name. */ - final String name() { - return this.getClass().getSimpleName(); - } + abstract String name(); abstract String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); } @@ -176,6 +174,8 @@ final String name() { */ private static class RefreshSeqNoLagValidator extends LagValidator { + private static final String NAME = "refresh_seq_no_lag"; + private RefreshSeqNoLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { super(pressureSettings); } @@ -183,7 +183,7 @@ private RefreshSeqNoLagValidator(RemoteRefreshSegmentPressureSettings pressureSe @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { // Check if the remote store seq no lag is above the min seq no lag limit - return pressureTracker.getSeqNoLag() <= pressureSettings.getMinSeqNoLagLimit(); + return pressureTracker.getRefreshSeqNoLag() <= pressureSettings.getMinRefreshSeqNoLagLimit(); } @Override @@ -197,6 +197,11 @@ String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId sha pressureTracker.getLocalRefreshSeqNo() ); } + + @Override + String name() { + return NAME; + } } /** @@ -206,6 +211,8 @@ String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId sha */ private static class BytesLagValidator extends LagValidator { + private static final String NAME = "bytes_lag"; + private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { super(pressureSettings); } @@ -233,6 +240,11 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar dynamicBytesLagThreshold ); } + + @Override + String name() { + return NAME; + } } /** @@ -242,6 +254,8 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar */ private static class TimeLagValidator extends LagValidator { + private static final String NAME = "time_lag"; + private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { super(pressureSettings); } @@ -269,6 +283,11 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar dynamicTimeLagThreshold ); } + + @Override + String name() { + return NAME; + } } /** @@ -278,6 +297,8 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar */ private static class ConsecutiveFailureValidator extends LagValidator { + private static final String NAME = "consecutive_failures_lag"; + private ConsecutiveFailureValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { super(pressureSettings); } @@ -300,5 +321,10 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar pressureSettings.getMinConsecutiveFailuresLimit() ); } + + @Override + String name() { + return NAME; + } } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java index 95bd64d6935a6..6cb0d1d07e78b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java @@ -99,7 +99,7 @@ private static class Defaults { private volatile boolean remoteRefreshSegmentPressureEnabled; - private volatile long minSeqNoLagLimit; + private volatile long minRefreshSeqNoLagLimit; private volatile double bytesLagVarianceFactor; @@ -123,8 +123,8 @@ public RemoteRefreshSegmentPressureSettings( this.remoteRefreshSegmentPressureEnabled = REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, this::setRemoteRefreshSegmentPressureEnabled); - this.minSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings); - clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinSeqNoLagLimit); + this.minRefreshSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinRefreshSeqNoLagLimit); this.bytesLagVarianceFactor = BYTES_LAG_VARIANCE_FACTOR.get(settings); clusterSettings.addSettingsUpdateConsumer(BYTES_LAG_VARIANCE_FACTOR, this::setBytesLagVarianceFactor); @@ -168,12 +168,12 @@ public void setRemoteRefreshSegmentPressureEnabled(boolean remoteRefreshSegmentP this.remoteRefreshSegmentPressureEnabled = remoteRefreshSegmentPressureEnabled; } - public long getMinSeqNoLagLimit() { - return minSeqNoLagLimit; + public long getMinRefreshSeqNoLagLimit() { + return minRefreshSeqNoLagLimit; } - public void setMinSeqNoLagLimit(long minSeqNoLagLimit) { - this.minSeqNoLagLimit = minSeqNoLagLimit; + public void setMinRefreshSeqNoLagLimit(long minRefreshSeqNoLagLimit) { + this.minRefreshSeqNoLagLimit = minRefreshSeqNoLagLimit; } public double getBytesLagVarianceFactor() { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 50a9524291b4a..109eadf34509b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -55,7 +55,7 @@ public class RemoteRefreshSegmentTracker { /** * Keeps the seq no lag computed so that we do not compute it for every request. */ - private volatile long seqNoLag; + private volatile long refreshSeqNoLag; /** * Keeps the time (ms) lag computed so that we do not compute it for every request. @@ -180,7 +180,7 @@ void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + "currentLocalRefreshSeqNo=" + this.localRefreshSeqNo; this.localRefreshSeqNo = localRefreshSeqNo; - computeSeqNoLag(); + computeRefreshSeqNoLag(); } long getLocalRefreshTimeMs() { @@ -208,7 +208,7 @@ void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { + "currentRemoteRefreshSeqNo=" + this.remoteRefreshSeqNo; this.remoteRefreshSeqNo = remoteRefreshSeqNo; - computeSeqNoLag(); + computeRefreshSeqNoLag(); } long getRemoteRefreshTimeMs() { @@ -225,12 +225,12 @@ void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { computeTimeMsLag(); } - private void computeSeqNoLag() { - seqNoLag = localRefreshSeqNo - remoteRefreshSeqNo; + private void computeRefreshSeqNoLag() { + refreshSeqNoLag = localRefreshSeqNo - remoteRefreshSeqNo; } - long getSeqNoLag() { - return seqNoLag; + long getRefreshSeqNoLag() { + return refreshSeqNoLag; } private void computeTimeMsLag() { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java index ed88f91ebd062..66b5d6c4c19d8 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java @@ -55,7 +55,7 @@ public void testGetDefaultSettings() { assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); // Check min sequence number lag limit default value - assertEquals(5L, pressureSettings.getMinSeqNoLagLimit()); + assertEquals(5L, pressureSettings.getMinRefreshSeqNoLagLimit()); // Check bytes lag variance threshold default value assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -97,7 +97,7 @@ public void testGetConfiguredSettings() { assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); // Check min sequence number lag limit configured value - assertEquals(100L, pressureSettings.getMinSeqNoLagLimit()); + assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); // Check bytes lag variance threshold configured value assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -141,7 +141,7 @@ public void testUpdateAfterGetDefaultSettings() { assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); // Check min sequence number lag limit - assertEquals(100L, pressureSettings.getMinSeqNoLagLimit()); + assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); // Check bytes lag variance threshold updated assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -195,7 +195,7 @@ public void testUpdateAfterGetConfiguredSettings() { assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); // Check min sequence number lag limit - assertEquals(80L, pressureSettings.getMinSeqNoLagLimit()); + assertEquals(80L, pressureSettings.getMinRefreshSeqNoLagLimit()); // Check bytes lag variance threshold updated assertEquals(40.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index ffa591c3c886b..48bc28e3a497d 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -124,9 +124,9 @@ public void testComputeSeqNoLagOnUpdate() { int localRefreshSeqNo = randomIntBetween(50, 100); int remoteRefreshSeqNo = randomIntBetween(20, 50); pressureTracker.updateLocalRefreshSeqNo(localRefreshSeqNo); - assertEquals(localRefreshSeqNo, pressureTracker.getSeqNoLag()); + assertEquals(localRefreshSeqNo, pressureTracker.getRefreshSeqNoLag()); pressureTracker.updateRemoteRefreshSeqNo(remoteRefreshSeqNo); - assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getSeqNoLag()); + assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getRefreshSeqNoLag()); } public void testComputeTimeLagOnUpdate() {