diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 094e80d536647..4c839a4c6d9dd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -95,10 +95,6 @@ protected void assertDocCounts(int expectedDocCount, String... nodeNames) { } } - protected ClusterState getClusterState() { - return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); - } - protected DiscoveryNode getNodeContainingPrimaryShard() { final ClusterState state = getClusterState(); final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java index c3e4af0f05e35..10169c969015f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -8,12 +8,19 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.common.settings.Settings; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @@ -38,6 +45,18 @@ public void testWritesRejected() { () -> indexData(randomIntBetween(10, 20), randomBoolean()) ); assertTrue(ex.getMessage().contains("rejected execution on primary shard")); + String shardId = "0"; + RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); + final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); + List matches = Arrays.stream(response.getShards()) + .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .collect(Collectors.toList()); + assertEquals(1, matches.size()); + RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats(); + assertTrue(stats.bytesLag > 0); + assertTrue(stats.refreshTimeLagMs > 0); + assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0); + assertTrue(stats.rejectionCount > 0); deleteRepo(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 0914506e632dd..42850fc59c8ad 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -16,14 +16,10 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.transport.MockTransportService; import java.nio.file.Path; -import java.util.Collection; -import static java.util.Arrays.asList; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { @@ -31,11 +27,6 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final int SHARD_COUNT = 1; protected static final int REPLICA_COUNT = 1; - @Override - protected Collection> nodePlugins() { - return asList(MockTransportService.TestPlugin.class); - } - @Override protected boolean addMockInternalEngine() { return false; @@ -47,6 +38,10 @@ protected Settings featureFlagSettings() { } public Settings indexSettings() { + return defaultIndexSettings(); + } + + private Settings defaultIndexSettings() { return Settings.builder() .put(super.indexSettings()) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) @@ -59,6 +54,22 @@ public Settings indexSettings() { .build(); } + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(defaultIndexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .build(); + } + + protected Settings remoteTranslogIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f069950c11f17..290f0df591c64 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -52,22 +52,6 @@ public Settings indexSettings() { return remoteStoreIndexSettings(0); } - private Settings remoteStoreIndexSettings(int numberOfReplicas) { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .build(); - } - - private Settings remoteTranslogIndexSettings(int numberOfReplicas) { - return Settings.builder() - .put(remoteStoreIndexSettings(numberOfReplicas)) - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) - .build(); - } - private IndexResponse indexSingleDoc() { return client().prepareIndex(INDEX_NAME) .setId(UUIDs.randomBase64UUID()) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java new file mode 100644 index 0000000000000..3540fb2a6226f --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.common.UUIDs; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + public void testStatsResponseFromAllNodes() { + + // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes + // during this time frame. This ensures that the segment upload has started. + internalCluster().startDataOnlyNodes(3); + if (randomBoolean()) { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); + } else { + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + } + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + // Indexing documents along with refreshes and flushes. + for (int i = 0; i < randomIntBetween(5, 10); i++) { + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + indexSingleDoc(); + } + } + + // Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from + // each of the node in the cluster and check that the response is coming as expected. + ClusterState state = getClusterState(); + List nodes = StreamSupport.stream(state.nodes().getNodes().values().spliterator(), false) + .map(x -> x.value.getName()) + .collect(Collectors.toList()); + String shardId = "0"; + for (String node : nodes) { + RemoteStoreStatsResponse response = client(node).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); + assertTrue(response.getSuccessfulShards() > 0); + assertTrue(response.getShards() != null && response.getShards().length != 0); + final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); + List matches = Arrays.stream(response.getShards()) + .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .collect(Collectors.toList()); + assertEquals(1, matches.size()); + RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats(); + assertEquals(0, stats.refreshTimeLagMs); + assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber); + assertTrue(stats.uploadBytesStarted > 0); + assertEquals(0, stats.uploadBytesFailed); + assertTrue(stats.uploadBytesSucceeded > 0); + assertTrue(stats.totalUploadsStarted > 0); + assertEquals(0, stats.totalUploadsFailed); + assertTrue(stats.totalUploadsSucceeded > 0); + assertEquals(0, stats.rejectionCount); + assertEquals(0, stats.consecutiveFailuresCount); + assertEquals(0, stats.bytesLag); + assertTrue(stats.uploadBytesMovingAverage > 0); + assertTrue(stats.uploadBytesPerSecMovingAverage > 0); + assertTrue(stats.uploadTimeMovingAverage > 0); + } + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index 1d6361b3acfd5..ff974e32678d6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.routing.PlainShardsIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -89,14 +90,17 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque } return new PlainShardsIterator( newShardRoutings.stream() - .filter(shardRouting -> remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardRouting.shardId()) != null) .filter( shardRouting -> !request.local() || (shardRouting.currentNodeId() == null || shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId())) ) .filter(ShardRouting::primary) - .filter(shardRouting -> indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteStoreEnabled()) + .filter( + shardRouting -> Boolean.parseBoolean( + clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) + ) + ) .collect(Collectors.toList()) ); } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index dfd52949d3a6b..265566cd375fd 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -325,6 +325,7 @@ void incrementRejectionCount() { void incrementRejectionCount(String rejectionReason) { rejectionCountMap.computeIfAbsent(rejectionReason, k -> new AtomicLong()).incrementAndGet(); + incrementRejectionCount(); } long getRejectionCount(String rejectionReason) { diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 173d6bde71eab..0252a08bd1d87 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2517,4 +2517,8 @@ protected String replicaNodeName(String indexName) { return clusterState.getRoutingNodes().node(nodeId).node().getName(); } + protected ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + }