Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add support for Remote Translog Store stats in _remotestore/stats/ API #9263

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Redefine telemetry context restoration and propagation ([#9617](https://github.com/opensearch-project/OpenSearch/pull/9617))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -72,9 +73,14 @@ public void testStatsResponseFromAllNodes() {
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats stats = matches.get(0).getSegmentStats();
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteTranslogTransferTracker.Stats translogStats = matches.get(0).getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}

// Step 3 - Enable replicas on the existing indices and ensure that download
Expand All @@ -92,13 +98,20 @@ public void testStatsResponseFromAllNodes() {
for (RemoteStoreStats stat : matches) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = stat.getSegmentStats();
RemoteTranslogTransferTracker.Stats translogStats = stat.getTranslogStats();
if (routing.primary()) {
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
} else {
validateSegmentDownloadStats(stats);
assertEquals(0, stats.totalUploadsStarted);
validateSegmentDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);

assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
}
}
Expand All @@ -124,10 +137,15 @@ public void testStatsResponseAllShards() {
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertEquals(3, response.getSuccessfulShards());
assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 3);

RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteTranslogTransferTracker.Stats translogStats = response.getRemoteStoreStats()[0].getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);

// Step 3 - Enable replicas on the existing indices and ensure that download
// stats are being populated as well
changeReplicaCountAndEnsureGreen(1);
Expand All @@ -138,12 +156,19 @@ public void testStatsResponseAllShards() {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
segmentStats = stat.getSegmentStats();
translogStats = stat.getTranslogStats();
if (routing.primary()) {
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
} else {
validateSegmentDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);

assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
}

Expand Down Expand Up @@ -174,6 +199,10 @@ public void testStatsResponseFromLocalNode() {
RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteTranslogTransferTracker.Stats translogStats = response.getRemoteStoreStats()[0].getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
changeReplicaCountAndEnsureGreen(1);
for (String node : nodes) {
Expand All @@ -187,13 +216,20 @@ public void testStatsResponseFromLocalNode() {
for (RemoteStoreStats stat : response.getRemoteStoreStats()) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = stat.getSegmentStats();
RemoteTranslogTransferTracker.Stats translogStats = stat.getTranslogStats();
if (routing.primary()) {
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
} else {
validateSegmentDownloadStats(stats);
assertEquals(0, stats.totalUploadsStarted);
validateSegmentDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);

assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
}
}
Expand Down Expand Up @@ -491,15 +527,19 @@ public void testStatsOnRemoteStoreRestore() throws IOException {

RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
Arrays.stream(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats();
// Assert that we have both upload and download stats for the index
assertTrue(
segmentTracker.totalUploadsStarted > 0 && segmentTracker.totalUploadsSucceeded > 0 && segmentTracker.totalUploadsFailed == 0
segmentStats.totalUploadsStarted > 0 && segmentStats.totalUploadsSucceeded > 0 && segmentStats.totalUploadsFailed == 0
);
assertTrue(
segmentTracker.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& segmentTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
);

RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertNonZeroTranslogDownloadStats(translogStats);
});
}

Expand All @@ -520,19 +560,23 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
.get()
.getRemoteStoreStats();
Arrays.stream(remoteStoreStats).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats();
if (statObject.getShardRouting().primary()) {
assertTrue(
segmentTracker.totalUploadsSucceeded == 1
&& segmentTracker.totalUploadsStarted == segmentTracker.totalUploadsSucceeded
&& segmentTracker.totalUploadsFailed == 0
segmentStats.totalUploadsSucceeded == 1
&& segmentStats.totalUploadsStarted == segmentStats.totalUploadsSucceeded
&& segmentStats.totalUploadsFailed == 0
);
} else {
assertTrue(
segmentTracker.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& segmentTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
}

RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
});
}, 5, TimeUnit.SECONDS);
}
Expand All @@ -545,9 +589,7 @@ private void indexDocs() {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(10, 30);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc(INDEX_NAME);
}
indexBulk(INDEX_NAME, numberOfOperations);
}
}

Expand Down Expand Up @@ -594,6 +636,43 @@ private void validateSegmentDownloadStats(RemoteSegmentTransferTracker.Stats sta
assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage > 0);
}

private void assertNonZeroTranslogUploadStatsNoFailures(RemoteTranslogTransferTracker.Stats stats) {
assertTrue(stats.uploadBytesStarted > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsSucceeded > 0);
assertTrue(stats.totalUploadTimeInMillis > 0);
assertTrue(stats.lastSuccessfulUploadTimestamp > 0);
}

private void assertZeroTranslogUploadStats(RemoteTranslogTransferTracker.Stats stats) {
assertEquals(0, stats.uploadBytesStarted);
assertEquals(0, stats.totalUploadsStarted);
assertEquals(0, stats.uploadBytesFailed);
assertEquals(0, stats.totalUploadsFailed);
assertEquals(0, stats.uploadBytesSucceeded);
assertEquals(0, stats.totalUploadsSucceeded);
assertEquals(0, stats.totalUploadTimeInMillis);
assertEquals(0, stats.lastSuccessfulUploadTimestamp);
}

private void assertNonZeroTranslogDownloadStats(RemoteTranslogTransferTracker.Stats stats) {
assertTrue(stats.downloadBytesSucceeded > 0);
assertTrue(stats.totalDownloadsSucceeded > 0);
// TODO: Need to simulate a delay for this assertion to avoid flakiness
// assertTrue(stats.totalDownloadTimeInMillis > 0);
assertTrue(stats.lastSuccessfulDownloadTimestamp > 0);
}

private void assertZeroTranslogDownloadStats(RemoteTranslogTransferTracker.Stats stats) {
assertEquals(0, stats.downloadBytesSucceeded);
assertEquals(0, stats.totalDownloadsSucceeded);
assertEquals(0, stats.totalDownloadTimeInMillis);
assertEquals(0, stats.lastSuccessfulDownloadTimestamp);
}

// Validate if the shardRouting obtained from cluster state contains the exact same routing object
// parameters as obtained from the remote store stats API
private void validateShardRouting(ShardRouting routing) {
Expand Down
Loading
Loading