Skip to content

Commit

Permalink
Add ShardIndexingPressure framework level construct and Stats (#1015)
Browse files Browse the repository at this point in the history
* Add ShardIndexingPressure framework level construct and related Stats artefacts.
* Test and code refactoring for shard indexing pressure.
* Moved the average calculation logic to common memory manager util.
* Add wrapper for releasable in ShardIndexingPressure operations.

Signed-off-by: Saurabh Singh <[email protected]>
  • Loading branch information
getsaurabh02 authored and adnapibar committed Sep 15, 2021
1 parent a4cf39d commit 1f22786
Show file tree
Hide file tree
Showing 8 changed files with 2,618 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.ShardIndexingPressureSettings;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -53,6 +54,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
private String[] completionDataFields = null;
private boolean includeSegmentFileSizes = false;
private boolean includeUnloadedSegments = false;
private boolean includeAllShardIndexingPressureTrackers = false;
private boolean includeOnlyTopIndexingPressureMetrics = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -80,6 +83,10 @@ public CommonStatsFlags(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
}
}

@Override
Expand All @@ -98,6 +105,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
out.writeBoolean(includeUnloadedSegments);
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
}
}

/**
Expand All @@ -111,6 +122,8 @@ public CommonStatsFlags all() {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}

Expand All @@ -125,6 +138,8 @@ public CommonStatsFlags clear() {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}

Expand Down Expand Up @@ -198,10 +213,28 @@ public CommonStatsFlags includeUnloadedSegments(boolean includeUnloadedSegments)
return this;
}

public CommonStatsFlags includeAllShardIndexingPressureTrackers(boolean includeAllShardPressureTrackers) {
this.includeAllShardIndexingPressureTrackers = includeAllShardPressureTrackers;
return this;
}

public CommonStatsFlags includeOnlyTopIndexingPressureMetrics(boolean includeOnlyTopIndexingPressureMetrics) {
this.includeOnlyTopIndexingPressureMetrics = includeOnlyTopIndexingPressureMetrics;
return this;
}

public boolean includeUnloadedSegments() {
return this.includeUnloadedSegments;
}

public boolean includeAllShardIndexingPressureTrackers() {
return this.includeAllShardIndexingPressureTrackers;
}

public boolean includeOnlyTopIndexingPressureMetrics() {
return this.includeOnlyTopIndexingPressureMetrics;
}

public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}
Expand Down
350 changes: 350 additions & 0 deletions server/src/main/java/org/opensearch/index/ShardIndexingPressure.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,26 @@ ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) {
return shardIndexingPressureStore.getShardIndexingPressureTracker(shardId);
}

Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureHotStore() {
return shardIndexingPressureStore.getShardIndexingPressureHotStore();
}

Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureColdStore() {
return shardIndexingPressureStore.getShardIndexingPressureColdStore();
}

void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, BooleanSupplier condition) {
shardIndexingPressureStore.tryTrackerCleanupFromHotStore(tracker, condition);
}

double calculateMovingAverage(long currentAverage, double frontValue, double currentValue, int count) {
if(count > 0) {
return ((Double.longBitsToDouble(currentAverage) * count) + currentValue - frontValue) / count;
} else {
return currentValue;
}
}

long getTotalNodeLimitsBreachedRejections() {
return totalNodeLimitsBreachedRejections.get();
}
Expand Down Expand Up @@ -417,7 +437,7 @@ private boolean evaluateThroughputDegradationLimitsBreached(PerformanceTracker p
*/
private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) {
return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.millis() &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.nanos() &&
performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public long incrementTotalBytes(long bytes) {
public long getRequestCount() {
return requestCount.get();
}

public long incrementRequestCount() {
return requestCount.incrementAndGet();
}
}

/**
Expand All @@ -161,6 +165,10 @@ public long getTotalRejections() {
return totalRejections.get();
}

public long incrementTotalRejections() {
return totalRejections.incrementAndGet();
}

public long getNodeLimitsBreachedRejections() {
return nodeLimitsBreachedRejections.get();
}
Expand Down Expand Up @@ -232,6 +240,10 @@ public long incrementTotalOutstandingRequests() {
return totalOutstandingRequests.incrementAndGet();
}

public void resetTotalOutstandingRequests() {
totalOutstandingRequests.set(0L);
}

public long getThroughputMovingAverage() {
return throughputMovingAverage.get();
}
Expand Down Expand Up @@ -275,5 +287,9 @@ public long incrementCurrentCombinedCoordinatingAndPrimaryBytes(long bytes) {
public long getTotalCombinedCoordinatingAndPrimaryBytes() {
return totalCombinedCoordinatingAndPrimaryBytes.get();
}

public long incrementTotalCombinedCoordinatingAndPrimaryBytes(long bytes) {
return totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
}
}
}
Loading

0 comments on commit 1f22786

Please sign in to comment.