Skip to content

Commit

Permalink
Incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 2, 2023
1 parent 008a0fa commit c82ffb7
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void validateSegmentsUploadLag(ShardId shardId) {
RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId);
// Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can
// increase the bytes to upload almost suddenly.
if (remoteRefreshSegmentTracker.getSeqNoLag() <= 1) {
if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) {
return;
}

Expand Down Expand Up @@ -162,9 +162,7 @@ private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) {
*
* @return the name using class name.
*/
final String name() {
return this.getClass().getSimpleName();
}
abstract String name();

abstract String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId);
}
Expand All @@ -176,14 +174,16 @@ final String name() {
*/
private static class RefreshSeqNoLagValidator extends LagValidator {

private static final String NAME = "refresh_seq_no_lag";

private RefreshSeqNoLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) {
super(pressureSettings);
}

@Override
public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) {
// Check if the remote store seq no lag is above the min seq no lag limit
return pressureTracker.getSeqNoLag() <= pressureSettings.getMinSeqNoLagLimit();
return pressureTracker.getRefreshSeqNoLag() <= pressureSettings.getMinRefreshSeqNoLagLimit();
}

@Override
Expand All @@ -197,6 +197,11 @@ String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId sha
pressureTracker.getLocalRefreshSeqNo()
);
}

@Override
String name() {
return NAME;
}
}

/**
Expand All @@ -206,6 +211,8 @@ String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId sha
*/
private static class BytesLagValidator extends LagValidator {

private static final String NAME = "bytes_lag";

private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) {
super(pressureSettings);
}
Expand Down Expand Up @@ -233,6 +240,11 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar
dynamicBytesLagThreshold
);
}

@Override
String name() {
return NAME;
}
}

/**
Expand All @@ -242,6 +254,8 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar
*/
private static class TimeLagValidator extends LagValidator {

private static final String NAME = "time_lag";

private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) {
super(pressureSettings);
}
Expand Down Expand Up @@ -269,6 +283,11 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar
dynamicTimeLagThreshold
);
}

@Override
String name() {
return NAME;
}
}

/**
Expand All @@ -278,6 +297,8 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar
*/
private static class ConsecutiveFailureValidator extends LagValidator {

private static final String NAME = "consecutive_failures_lag";

private ConsecutiveFailureValidator(RemoteRefreshSegmentPressureSettings pressureSettings) {
super(pressureSettings);
}
Expand All @@ -300,5 +321,10 @@ public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, Shar
pressureSettings.getMinConsecutiveFailuresLimit()
);
}

@Override
String name() {
return NAME;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static class Defaults {

private volatile boolean remoteRefreshSegmentPressureEnabled;

private volatile long minSeqNoLagLimit;
private volatile long minRefreshSeqNoLagLimit;

private volatile double bytesLagVarianceFactor;

Expand All @@ -123,8 +123,8 @@ public RemoteRefreshSegmentPressureSettings(
this.remoteRefreshSegmentPressureEnabled = REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, this::setRemoteRefreshSegmentPressureEnabled);

this.minSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings);
clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinSeqNoLagLimit);
this.minRefreshSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings);
clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinRefreshSeqNoLagLimit);

this.bytesLagVarianceFactor = BYTES_LAG_VARIANCE_FACTOR.get(settings);
clusterSettings.addSettingsUpdateConsumer(BYTES_LAG_VARIANCE_FACTOR, this::setBytesLagVarianceFactor);
Expand Down Expand Up @@ -168,12 +168,12 @@ public void setRemoteRefreshSegmentPressureEnabled(boolean remoteRefreshSegmentP
this.remoteRefreshSegmentPressureEnabled = remoteRefreshSegmentPressureEnabled;
}

public long getMinSeqNoLagLimit() {
return minSeqNoLagLimit;
public long getMinRefreshSeqNoLagLimit() {
return minRefreshSeqNoLagLimit;
}

public void setMinSeqNoLagLimit(long minSeqNoLagLimit) {
this.minSeqNoLagLimit = minSeqNoLagLimit;
public void setMinRefreshSeqNoLagLimit(long minRefreshSeqNoLagLimit) {
this.minRefreshSeqNoLagLimit = minRefreshSeqNoLagLimit;
}

public double getBytesLagVarianceFactor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class RemoteRefreshSegmentTracker {
/**
* Keeps the seq no lag computed so that we do not compute it for every request.
*/
private volatile long seqNoLag;
private volatile long refreshSeqNoLag;

/**
* Keeps the time (ms) lag computed so that we do not compute it for every request.
Expand Down Expand Up @@ -180,7 +180,7 @@ void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
+ "currentLocalRefreshSeqNo="
+ this.localRefreshSeqNo;
this.localRefreshSeqNo = localRefreshSeqNo;
computeSeqNoLag();
computeRefreshSeqNoLag();
}

long getLocalRefreshTimeMs() {
Expand Down Expand Up @@ -208,7 +208,7 @@ void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
+ "currentRemoteRefreshSeqNo="
+ this.remoteRefreshSeqNo;
this.remoteRefreshSeqNo = remoteRefreshSeqNo;
computeSeqNoLag();
computeRefreshSeqNoLag();
}

long getRemoteRefreshTimeMs() {
Expand All @@ -225,12 +225,12 @@ void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
computeTimeMsLag();
}

private void computeSeqNoLag() {
seqNoLag = localRefreshSeqNo - remoteRefreshSeqNo;
private void computeRefreshSeqNoLag() {
refreshSeqNoLag = localRefreshSeqNo - remoteRefreshSeqNo;
}

long getSeqNoLag() {
return seqNoLag;
long getRefreshSeqNoLag() {
return refreshSeqNoLag;
}

private void computeTimeMsLag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testGetDefaultSettings() {
assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit default value
assertEquals(5L, pressureSettings.getMinSeqNoLagLimit());
assertEquals(5L, pressureSettings.getMinRefreshSeqNoLagLimit());

// Check bytes lag variance threshold default value
assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d);
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testGetConfiguredSettings() {
assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit configured value
assertEquals(100L, pressureSettings.getMinSeqNoLagLimit());
assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit());

// Check bytes lag variance threshold configured value
assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d);
Expand Down Expand Up @@ -141,7 +141,7 @@ public void testUpdateAfterGetDefaultSettings() {
assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit
assertEquals(100L, pressureSettings.getMinSeqNoLagLimit());
assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit());

// Check bytes lag variance threshold updated
assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d);
Expand Down Expand Up @@ -195,7 +195,7 @@ public void testUpdateAfterGetConfiguredSettings() {
assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit
assertEquals(80L, pressureSettings.getMinSeqNoLagLimit());
assertEquals(80L, pressureSettings.getMinRefreshSeqNoLagLimit());

// Check bytes lag variance threshold updated
assertEquals(40.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ public void testComputeSeqNoLagOnUpdate() {
int localRefreshSeqNo = randomIntBetween(50, 100);
int remoteRefreshSeqNo = randomIntBetween(20, 50);
pressureTracker.updateLocalRefreshSeqNo(localRefreshSeqNo);
assertEquals(localRefreshSeqNo, pressureTracker.getSeqNoLag());
assertEquals(localRefreshSeqNo, pressureTracker.getRefreshSeqNoLag());
pressureTracker.updateRemoteRefreshSeqNo(remoteRefreshSeqNo);
assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getSeqNoLag());
assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getRefreshSeqNoLag());
}

public void testComputeTimeLagOnUpdate() {
Expand Down

0 comments on commit c82ffb7

Please sign in to comment.