Skip to content

Commit

Permalink
[Remote Store] Add Remote Store backpressure rejection stats to _node…
Browse files Browse the repository at this point in the history
…s/stats (opensearch-project#10524)

Signed-off-by: Bhumika Saini <[email protected]>
Signed-off-by: Siddhant Deshmukh <[email protected]>
  • Loading branch information
Bhumika Saini authored and deshsidd committed Oct 18, 2023
1 parent 7cf6dfe commit 46eb23c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Dependencies

### Changed
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -75,6 +76,10 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Total time spent in downloading segments from remote store
*/
private long totalDownloadTime;
/**
* Total rejections due to remote store upload backpressure
*/
private long totalRejections;

public RemoteSegmentStats() {}

Expand All @@ -90,6 +95,10 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
totalRejections = in.readVLong();
}
}

/**
Expand All @@ -115,6 +124,7 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.totalRefreshBytesLag = trackerStats.bytesLag;
this.totalUploadTime = trackerStats.totalUploadTimeInMs;
this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs;
this.totalRejections = trackerStats.rejectionCount;
}

// Getter and setters. All are visible for testing
Expand Down Expand Up @@ -207,6 +217,14 @@ public void addTotalDownloadTime(long totalDownloadTime) {
this.totalDownloadTime += totalDownloadTime;
}

public long getTotalRejections() {
return totalRejections;
}

public void addTotalRejections(long totalRejections) {
this.totalRejections += totalRejections;
}

/**
* Adds existing stats. Used for stats roll-ups at index or node level
*
Expand All @@ -225,6 +243,7 @@ public void add(RemoteSegmentStats existingStats) {
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
this.totalUploadTime += existingStats.getTotalUploadTime();
this.totalDownloadTime += existingStats.getTotalDownloadTime();
this.totalRejections += existingStats.totalRejections;
}
}

Expand All @@ -241,18 +260,26 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(totalRejections);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REMOTE_STORE);

builder.startObject(Fields.UPLOAD);
buildUploadStats(builder);
builder.endObject();
builder.endObject(); // UPLOAD

builder.startObject(Fields.DOWNLOAD);
buildDownloadStats(builder);
builder.endObject();
builder.endObject();
builder.endObject(); // DOWNLOAD

builder.endObject(); // REMOTE_STORE

return builder;
}

Expand All @@ -261,13 +288,19 @@ private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed));
builder.endObject();
builder.endObject(); // TOTAL_UPLOAD_SIZE

builder.startObject(Fields.REFRESH_SIZE_LAG);
builder.humanReadableField(Fields.TOTAL_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag));
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.endObject(); // REFRESH_SIZE_LAG

builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime));

builder.startObject(Fields.PRESSURE);
builder.field(Fields.TOTAL_REJECTIONS, totalRejections);
builder.endObject(); // PRESSURE
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -300,6 +333,8 @@ static final class Fields {
static final String MAX_BYTES = "max_bytes";
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
static final String PRESSURE = "pressure";
static final String TOTAL_REJECTIONS = "total_rejections";
}

@Override
Expand All @@ -318,7 +353,8 @@ public boolean equals(Object o) {
&& maxRefreshBytesLag == that.maxRefreshBytesLag
&& totalRefreshBytesLag == that.totalRefreshBytesLag
&& totalUploadTime == that.totalUploadTime
&& totalDownloadTime == that.totalDownloadTime;
&& totalDownloadTime == that.totalDownloadTime
&& totalRejections == that.totalRejections;
}

@Override
Expand All @@ -334,7 +370,8 @@ public int hashCode() {
maxRefreshBytesLag,
totalRefreshBytesLag,
totalUploadTime,
totalDownloadTime
totalDownloadTime,
totalRejections
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public long getRejectionCount() {
return rejectionCount.get();
}

void incrementRejectionCount() {
/** public only for testing **/
public void incrementRejectionCount() {
rejectionCount.incrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
remoteSegmentStats.setMaxRefreshTimeLag(2L);
remoteSegmentStats.addTotalUploadTime(20L);
remoteSegmentStats.addTotalDownloadTime(20L);
remoteSegmentStats.addTotalRejections(5L);

RemoteTranslogStats remoteTranslogStats = indicesStats.getTranslog().getRemoteTranslogStats();
RemoteTranslogStats otherRemoteTranslogStats = new RemoteTranslogStats(getRandomRemoteTranslogTransferTrackerStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4910,6 +4910,8 @@ private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker track
tracker.addUploadBytesStarted(30L);
tracker.addUploadBytesSucceeded(10L);
tracker.addUploadBytesFailed(10L);
tracker.incrementRejectionCount();
tracker.incrementRejectionCount();
}

private void populateSampleRemoteTranslogStats(RemoteTranslogTransferTracker tracker) {
Expand Down Expand Up @@ -4943,5 +4945,7 @@ private static void assertRemoteSegmentStats(
assertEquals(remoteSegmentTransferTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted());
assertEquals(remoteSegmentTransferTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(remoteSegmentTransferTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed());
assertTrue(remoteSegmentStats.getTotalRejections() > 0);
assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections());
}
}

0 comments on commit 46eb23c

Please sign in to comment.