Skip to content

Commit

Permalink
Add remote refresh lag in millis from local refresh (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#7694)

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi authored May 24, 2023
1 parent 49fafd1 commit ece27af
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject()
.field(Fields.SHARD_ID, remoteSegmentUploadShardStats.shardId)

.field(Fields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.localRefreshTimeMs)
.field(Fields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.remoteRefreshTimeMs)
.field(Fields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentUploadShardStats.refreshTimeLagMs)
.field(Fields.REFRESH_LAG, remoteSegmentUploadShardStats.localRefreshNumber - remoteSegmentUploadShardStats.remoteRefreshNumber)
.field(Fields.BYTES_LAG, remoteSegmentUploadShardStats.bytesLag)

Expand Down Expand Up @@ -90,16 +89,6 @@ public void writeTo(StreamOutput out) throws IOException {
static final class Fields {
static final String SHARD_ID = "shard_id";

/**
* Last successful local refresh timestamp in milliseconds
*/
static final String LOCAL_REFRESH_TIMESTAMP = "local_refresh_timestamp_in_millis";

/**
* Last successful remote refresh timestamp in milliseconds
*/
static final String REMOTE_REFRESH_TIMESTAMP = "remote_refresh_timestamp_in_millis";

/**
* Lag in terms of bytes b/w local and remote store
*/
Expand All @@ -110,6 +99,11 @@ static final class Fields {
*/
static final String REFRESH_LAG = "refresh_lag";

/**
* Time in millis remote refresh is behind local refresh
*/
static final String REFRESH_TIME_LAG_IN_MILLIS = "refresh_time_lag_in_millis";

/**
* Total write rejections due to remote store backpressure kick in
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,9 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) {
public RemoteRefreshSegmentTracker.Stats stats() {
return new RemoteRefreshSegmentTracker.Stats(
shardId,
timeMsLag,
localRefreshSeqNo,
localRefreshTimeMs,
remoteRefreshSeqNo,
remoteRefreshTimeMs,
uploadBytesStarted,
uploadBytesSucceeded,
uploadBytesFailed,
Expand All @@ -473,10 +472,9 @@ public RemoteRefreshSegmentTracker.Stats stats() {
public static class Stats implements Writeable {

public final ShardId shardId;
public final long refreshTimeLagMs;
public final long localRefreshNumber;
public final long localRefreshTimeMs;
public final long remoteRefreshNumber;
public final long remoteRefreshTimeMs;
public final long uploadBytesStarted;
public final long uploadBytesFailed;
public final long uploadBytesSucceeded;
Expand All @@ -493,10 +491,9 @@ public static class Stats implements Writeable {

public Stats(
ShardId shardId,
long refreshTimeLagMs,
long localRefreshNumber,
long localRefreshTimeMs,
long remoteRefreshNumber,
long remoteRefreshTimeMs,
long uploadBytesStarted,
long uploadBytesSucceeded,
long uploadBytesFailed,
Expand All @@ -512,10 +509,9 @@ public Stats(
long bytesLag
) {
this.shardId = shardId;
this.refreshTimeLagMs = refreshTimeLagMs;
this.localRefreshNumber = localRefreshNumber;
this.localRefreshTimeMs = localRefreshTimeMs;
this.remoteRefreshNumber = remoteRefreshNumber;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
this.uploadBytesStarted = uploadBytesStarted;
this.uploadBytesFailed = uploadBytesFailed;
this.uploadBytesSucceeded = uploadBytesSucceeded;
Expand All @@ -534,10 +530,9 @@ public Stats(
public Stats(StreamInput in) throws IOException {
try {
this.shardId = new ShardId(in);
this.refreshTimeLagMs = in.readLong();
this.localRefreshNumber = in.readLong();
this.localRefreshTimeMs = in.readLong();
this.remoteRefreshNumber = in.readLong();
this.remoteRefreshTimeMs = in.readLong();
this.uploadBytesStarted = in.readLong();
this.uploadBytesFailed = in.readLong();
this.uploadBytesSucceeded = in.readLong();
Expand All @@ -559,10 +554,9 @@ public Stats(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(refreshTimeLagMs);
out.writeLong(localRefreshNumber);
out.writeLong(localRefreshTimeMs);
out.writeLong(remoteRefreshNumber);
out.writeLong(remoteRefreshTimeMs);
out.writeLong(uploadBytesStarted);
out.writeLong(uploadBytesFailed);
out.writeLong(uploadBytesSucceeded);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,22 @@
import java.util.Map;

import static org.opensearch.test.OpenSearchTestCase.assertEquals;
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;

/**
* Helper utilities for Remote Store stats tests
*/
public class RemoteStoreStatsTestHelper {
static RemoteRefreshSegmentTracker.Stats createPressureTrackerStats(ShardId shardId) {
return new RemoteRefreshSegmentTracker.Stats(
shardId,
3,
System.nanoTime() / 1_000_000L + randomIntBetween(10, 100),
2,
System.nanoTime() / 1_000_000L + randomIntBetween(10, 100),
10,
5,
5,
10,
5,
5,
3,
2,
5,
2,
3,
4,
9
);
return new RemoteRefreshSegmentTracker.Stats(shardId, 100, 3, 2, 10, 5, 5, 10, 5, 5, 3, 2, 5, 2, 3, 4, 9);
}

static void compareStatsResponse(Map<String, Object> statsObject, RemoteRefreshSegmentTracker.Stats pressureTrackerStats) {
assertEquals(statsObject.get(RemoteStoreStats.Fields.SHARD_ID), pressureTrackerStats.shardId.toString());
assertEquals(statsObject.get(RemoteStoreStats.Fields.LOCAL_REFRESH_TIMESTAMP), (int) pressureTrackerStats.localRefreshTimeMs);
assertEquals(statsObject.get(RemoteStoreStats.Fields.REFRESH_TIME_LAG_IN_MILLIS), (int) pressureTrackerStats.refreshTimeLagMs);
assertEquals(
statsObject.get(RemoteStoreStats.Fields.REFRESH_LAG),
(int) (pressureTrackerStats.localRefreshNumber - pressureTrackerStats.remoteRefreshNumber)
);
assertEquals(statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_TIMESTAMP), (int) pressureTrackerStats.remoteRefreshTimeMs);
assertEquals(statsObject.get(RemoteStoreStats.Fields.BYTES_LAG), (int) pressureTrackerStats.bytesLag);

assertEquals(statsObject.get(RemoteStoreStats.Fields.BACKPRESSURE_REJECTION_COUNT), (int) pressureTrackerStats.rejectionCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ public void testSerialization() throws Exception {
try (StreamInput in = out.bytes().streamInput()) {
RemoteStoreStats deserializedStats = new RemoteStoreStats(in);
assertEquals(deserializedStats.getStats().shardId.toString(), stats.getStats().shardId.toString());
assertEquals(deserializedStats.getStats().refreshTimeLagMs, stats.getStats().refreshTimeLagMs);
assertEquals(deserializedStats.getStats().localRefreshNumber, stats.getStats().localRefreshNumber);
assertEquals(deserializedStats.getStats().localRefreshTimeMs, stats.getStats().localRefreshTimeMs);
assertEquals(deserializedStats.getStats().remoteRefreshNumber, stats.getStats().remoteRefreshNumber);
assertEquals(deserializedStats.getStats().remoteRefreshTimeMs, stats.getStats().remoteRefreshTimeMs);
assertEquals(deserializedStats.getStats().uploadBytesStarted, stats.getStats().uploadBytesStarted);
assertEquals(deserializedStats.getStats().uploadBytesSucceeded, stats.getStats().uploadBytesSucceeded);
assertEquals(deserializedStats.getStats().uploadBytesFailed, stats.getStats().uploadBytesFailed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,8 @@ public void testStatsObjectCreation() {
pressureTracker = constructTracker();
RemoteRefreshSegmentTracker.Stats pressureTrackerStats = pressureTracker.stats();
assertEquals(pressureTracker.getShardId(), pressureTrackerStats.shardId);
assertEquals(pressureTracker.getLocalRefreshTimeMs(), (int) pressureTrackerStats.localRefreshTimeMs);
assertEquals(pressureTracker.getTimeMsLag(), (int) pressureTrackerStats.refreshTimeLagMs);
assertEquals(pressureTracker.getLocalRefreshSeqNo(), (int) pressureTrackerStats.localRefreshNumber);
assertEquals(pressureTracker.getRemoteRefreshTimeMs(), (int) pressureTrackerStats.remoteRefreshTimeMs);
assertEquals(pressureTracker.getRemoteRefreshSeqNo(), (int) pressureTrackerStats.remoteRefreshNumber);
assertEquals(pressureTracker.getBytesLag(), (int) pressureTrackerStats.bytesLag);
assertEquals(pressureTracker.getRejectionCount(), (int) pressureTrackerStats.rejectionCount);
Expand Down Expand Up @@ -441,9 +440,8 @@ public void testStatsObjectCreationViaStream() throws IOException {
try (StreamInput in = out.bytes().streamInput()) {
RemoteRefreshSegmentTracker.Stats deserializedStats = new RemoteRefreshSegmentTracker.Stats(in);
assertEquals(deserializedStats.shardId, pressureTrackerStats.shardId);
assertEquals((int) deserializedStats.localRefreshTimeMs, (int) pressureTrackerStats.localRefreshTimeMs);
assertEquals((int) deserializedStats.refreshTimeLagMs, (int) pressureTrackerStats.refreshTimeLagMs);
assertEquals((int) deserializedStats.localRefreshNumber, (int) pressureTrackerStats.localRefreshNumber);
assertEquals((int) deserializedStats.remoteRefreshTimeMs, (int) pressureTrackerStats.remoteRefreshTimeMs);
assertEquals((int) deserializedStats.remoteRefreshNumber, (int) pressureTrackerStats.remoteRefreshNumber);
assertEquals((int) deserializedStats.bytesLag, (int) pressureTrackerStats.bytesLag);
assertEquals((int) deserializedStats.rejectionCount, (int) pressureTrackerStats.rejectionCount);
Expand Down

0 comments on commit ece27af

Please sign in to comment.