Skip to content

Commit

Permalink
Add support for Remote Translog Store stats in _remotestore/stats/ API
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Aug 11, 2023
1 parent aba072e commit 2b56430
Show file tree
Hide file tree
Showing 20 changed files with 1,823 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTracker;

import java.io.IOException;

Expand All @@ -29,15 +30,25 @@ public class RemoteStoreStats implements Writeable, ToXContentFragment {
*/
private final RemoteSegmentTransferTracker.Stats remoteSegmentShardStats;

/**
* Stats related to Remote Translog Store operations
*/
private final RemoteTranslogTracker.Stats remoteTranslogShardStats;
private final ShardRouting shardRouting;

public RemoteStoreStats(RemoteSegmentTransferTracker.Stats remoteSegmentUploadShardStats, ShardRouting shardRouting) {
public RemoteStoreStats(
RemoteSegmentTransferTracker.Stats remoteSegmentUploadShardStats,
RemoteTranslogTracker.Stats remoteTranslogShardStats,
ShardRouting shardRouting
) {
this.remoteSegmentShardStats = remoteSegmentUploadShardStats;
this.remoteTranslogShardStats = remoteTranslogShardStats;
this.shardRouting = shardRouting;
}

public RemoteStoreStats(StreamInput in) throws IOException {
this.remoteSegmentShardStats = in.readOptionalWriteable(RemoteSegmentTransferTracker.Stats::new);
remoteSegmentShardStats = in.readOptionalWriteable(RemoteSegmentTransferTracker.Stats::new);
remoteTranslogShardStats = in.readOptionalWriteable(RemoteTranslogTracker.Stats::new);
this.shardRouting = new ShardRouting(in);
}

Expand All @@ -49,10 +60,15 @@ public ShardRouting getShardRouting() {
return shardRouting;
}

public RemoteTranslogTracker.Stats getTranslogStats() {
return remoteTranslogShardStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
buildShardRouting(builder);

builder.startObject(Fields.SEGMENT);
builder.startObject(SubFields.DOWNLOAD);
// Ensuring that we are not showing 0 metrics to the user
Expand All @@ -67,15 +83,82 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject(); // segment.upload
builder.endObject(); // segment

builder.startObject(Fields.TRANSLOG);
builder.startObject(SubFields.UPLOAD);
buildTranslogUploadStats(builder);
builder.endObject(); // translog.upload
builder.startObject(SubFields.DOWNLOAD);
buildTranslogDownloadStats(builder);
builder.endObject(); // translog.download
builder.endObject(); // translog

return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentShardStats);
out.writeOptionalWriteable(remoteTranslogShardStats);
shardRouting.writeTo(out);
}

private void buildTranslogUploadStats(XContentBuilder builder) throws IOException {
builder.field(UploadStatsFields.LAST_SUCCESSFUL_UPLOAD_TIMESTAMP, remoteTranslogShardStats.lastSuccessfulUploadTimestamp);

builder.startObject(UploadStatsFields.TOTAL_UPLOADS);
builder.field(SubFields.STARTED, remoteTranslogShardStats.totalUploadsStarted)
.field(SubFields.FAILED, remoteTranslogShardStats.totalUploadsFailed)
.field(SubFields.SUCCEEDED, remoteTranslogShardStats.totalUploadsSucceeded);
builder.endObject();

builder.startObject(UploadStatsFields.TOTAL_UPLOADS_IN_BYTES);
builder.field(SubFields.STARTED, remoteTranslogShardStats.uploadBytesStarted)
.field(SubFields.FAILED, remoteTranslogShardStats.uploadBytesFailed)
.field(SubFields.SUCCEEDED, remoteTranslogShardStats.uploadBytesSucceeded);
builder.endObject();

builder.field(UploadStatsFields.TOTAL_UPLOAD_TIME_IN_MILLIS, remoteTranslogShardStats.totalUploadTimeInMillis);

builder.startObject(UploadStatsFields.UPLOAD_SIZE_IN_BYTES);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadBytesMovingAverage);
builder.endObject();

builder.startObject(UploadStatsFields.UPLOAD_SPEED_IN_BYTES_PER_SEC);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadBytesPerSecMovingAverage);
builder.endObject();

builder.startObject(UploadStatsFields.UPLOAD_TIME_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadTimeMovingAverage);
builder.endObject();
}

private void buildTranslogDownloadStats(XContentBuilder builder) throws IOException {
builder.field(DownloadStatsFields.LAST_SUCCESSFUL_DOWNLOAD_TIMESTAMP, remoteTranslogShardStats.lastSuccessfulDownloadTimestamp);

builder.startObject(DownloadStatsFields.TOTAL_DOWNLOADS);
builder.field(SubFields.SUCCEEDED, remoteTranslogShardStats.totalDownloadsSucceeded);
builder.endObject();

builder.startObject(DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES);
builder.field(SubFields.SUCCEEDED, remoteTranslogShardStats.downloadBytesSucceeded);
builder.endObject();

builder.field(DownloadStatsFields.TOTAL_DOWNLOAD_TIME_IN_MILLIS, remoteTranslogShardStats.totalDownloadTimeInMillis);

builder.startObject(DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.downloadBytesMovingAverage);
builder.endObject();

builder.startObject(DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.downloadBytesPerSecMovingAverage);
builder.endObject();

builder.startObject(DownloadStatsFields.DOWNLOAD_TIME_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.downloadTimeMovingAverage);
builder.endObject();
}

private void buildSegmentUploadStats(XContentBuilder builder) throws IOException {
builder.field(UploadStatsFields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentShardStats.localRefreshClockTimeMs)
.field(UploadStatsFields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentShardStats.remoteRefreshClockTimeMs)
Expand All @@ -98,7 +181,7 @@ private void buildSegmentUploadStats(XContentBuilder builder) throws IOException
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.lastSuccessfulRemoteRefreshBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)
builder.startObject(UploadStatsFields.UPLOAD_SPEED_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesPerSecMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS)
Expand Down Expand Up @@ -133,6 +216,9 @@ private void buildShardRouting(XContentBuilder builder) throws IOException {
builder.endObject();
}

/**
* Fields for remote store stats response
*/
static final class Fields {
static final String ROUTING = "routing";
static final String SEGMENT = "segment";
Expand Down Expand Up @@ -189,51 +275,96 @@ static final class UploadStatsFields {
*/
static final String TOTAL_SYNCS_TO_REMOTE = "total_syncs_to_remote";

/**
* Represents the size of new data to be uploaded as part of a refresh
*/
static final String REMOTE_REFRESH_SIZE_IN_BYTES = "remote_refresh_size_in_bytes";

/**
* Time taken by a single remote refresh
*/
static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis";

/**
* Timestamp of last successful remote store upload
*/
static final String LAST_SUCCESSFUL_UPLOAD_TIMESTAMP = "last_successful_upload_timestamp";

/**
* Number of total uploads to remote store
*/
static final String TOTAL_UPLOADS = "total_uploads";

/**
* Represents the total uploads to remote store in bytes
*/
static final String TOTAL_UPLOADS_IN_BYTES = "total_uploads_in_bytes";

/**
* Represents the size of new data to be uploaded as part of a refresh
* Total time spent on remote store uplaods
*/
static final String REMOTE_REFRESH_SIZE_IN_BYTES = "remote_refresh_size_in_bytes";
static final String TOTAL_UPLOAD_TIME_IN_MILLIS = "total_upload_time_in_millis";

/**
* Represents the size of new data to be transferred as part of a remote store upload
*/
static final String UPLOAD_SIZE_IN_BYTES = "upload_size_in_bytes";

/**
* Represents the speed of remote store uploads in bytes per sec
*/
static final String UPLOAD_LATENCY_IN_BYTES_PER_SEC = "upload_latency_in_bytes_per_sec";
static final String UPLOAD_SPEED_IN_BYTES_PER_SEC = "upload_speed_in_bytes_per_sec";

/**
* Time taken by a single remote refresh
* Time taken by a remote store upload
*/
static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis";
static final String UPLOAD_TIME_IN_MILLIS = "upload_time_in_millis";
}

static final class DownloadStatsFields {
/**
* Epoch timestamp of the last successful download session
*/
public static final String LAST_SUCCESSFUL_DOWNLOAD_TIMESTAMP = "last_successful_download_timestamp";

/**
* Last successful sync from remote in milliseconds
*/
static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp";

/**
* Total bytes of segment files downloaded from the remote store for a specific shard
* Count of total remote store download sessions
*/
public static final String TOTAL_DOWNLOADS = "total_downloads";

/**
* Total time spent in downloads from remote store
*/
public static final String TOTAL_DOWNLOAD_TIME_IN_MILLIS = "total_download_time_in_millis";

/**
* Total bytes of files downloaded from the remote store
*/
static final String TOTAL_DOWNLOADS_IN_BYTES = "total_downloads_in_bytes";

/**
* Size of each segment file downloaded from the remote store
* Average size of a file downloaded from the remote store
*/
static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes";

/**
* Speed (in bytes/sec) for segment file downloads
* Average speed (in bytes/sec) of a remote store download session
*/
static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec";

/**
* Average time spent on a remote store download session
*/
public static final String DOWNLOAD_TIME_IN_MILLIS = "download_time_in_millis";
}

/**
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
* Reusable sub fields for {@link Fields}
*/
static final class SubFields {
static final String STARTED = "started";
Expand All @@ -244,12 +375,12 @@ static final class SubFields {
static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat
* Moving avg over last N values stat for a {@link Fields}
*/
static final String MOVING_AVG = "moving_avg";

/**
* Most recent successful attempt stat
* Most recent successful attempt stat for a {@link Fields}
*/
static final String LAST_SUCCESSFUL = "last_successful";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.IndicesService;
Expand All @@ -49,7 +50,6 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct
RemoteStoreStats> {

private final IndicesService indicesService;

private final RemoteStorePressureService remoteStorePressureService;

@Inject
Expand Down Expand Up @@ -96,6 +96,7 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque
|| (shardRouting.currentNodeId() == null
|| shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId()))
)
.filter(ShardRouting::primary)
.filter(
shardRouting -> Boolean.parseBoolean(
clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
Expand Down Expand Up @@ -157,6 +158,10 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
indexShard.shardId()
);
assert Objects.nonNull(remoteSegmentTransferTracker);
return new RemoteStoreStats(remoteSegmentTransferTracker.stats(), indexShard.routingEntry());

RemoteTranslogTracker remoteTranslogTracker = remoteStorePressureService.getRemoteTranslogTracker(indexShard.shardId());
assert Objects.nonNull(remoteTranslogTracker);

return new RemoteStoreStats(remoteSegmentTransferTracker.stats(), remoteTranslogTracker.stats(), indexShard.routingEntry());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.DOWNLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.DOWNLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.DOWNLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
Expand Down
Loading

0 comments on commit 2b56430

Please sign in to comment.