Skip to content

Commit

Permalink
Add remote refresh segment pressure service, settings and tracker (op…
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 authored May 23, 2023
1 parent eef0181 commit 80aeb76
Show file tree
Hide file tree
Showing 10 changed files with 1,914 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
Expand Down Expand Up @@ -640,7 +641,17 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING
Node.NODE_SEARCH_CACHE_SIZE_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT,
RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR,
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE
)
)
);
Expand Down
42 changes: 35 additions & 7 deletions server/src/main/java/org/opensearch/common/util/MovingAverage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,47 @@ public class MovingAverage {
private final int windowSize;
private final long[] observations;

private long count = 0;
private long sum = 0;
private double average = 0;
private volatile long count = 0;
private volatile long sum = 0;
private volatile double average = 0;

public MovingAverage(int windowSize) {
if (windowSize <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}

checkWindowSize(windowSize);
this.windowSize = windowSize;
this.observations = new long[windowSize];
}

/**
* Used for changing the window size of {@code MovingAverage}.
*
* @param newWindowSize new window size.
* @return copy of original object with updated size.
*/
public MovingAverage copyWithSize(int newWindowSize) {
MovingAverage copy = new MovingAverage(newWindowSize);
// Start is inclusive, but end is exclusive
long start, end = count;
if (isReady() == false) {
start = 0;
} else {
start = end - windowSize;
}
// If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value
if (end - start > newWindowSize) {
start = end - newWindowSize;
}
for (int i = (int) start; i < end; i++) {
copy.record(observations[i % observations.length]);
}
return copy;
}

private void checkWindowSize(int size) {
if (size <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}
}

/**
* Records a new observation and evicts the n-th last observation.
*/
Expand Down
Loading

0 comments on commit 80aeb76

Please sign in to comment.