Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote Store] Fix stats api bugs (#7739) & (#7774) #7838

Merged
merged 2 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ protected void assertDocCounts(int expectedDocCount, String... nodeNames) {
}
}

protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

protected DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = getClusterState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

Expand All @@ -38,6 +45,18 @@ public void testWritesRejected() {
() -> indexData(randomIntBetween(10, 20), randomBoolean())
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);
deleteRepo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,17 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.nio.file.Path;
import java.util.Collection;

import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand All @@ -47,6 +38,10 @@ protected Settings featureFlagSettings() {
}

public Settings indexSettings() {
return defaultIndexSettings();
}

private Settings defaultIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
Expand All @@ -59,6 +54,22 @@ public Settings indexSettings() {
.build();
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(defaultIndexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,6 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
}

private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.UUIDs;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

public void testStatsResponseFromAllNodes() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
internalCluster().startDataOnlyNodes(3);
if (randomBoolean()) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
}
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

// Indexing documents along with refreshes and flushes.
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
}

// Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from
// each of the node in the cluster and check that the response is coming as expected.
ClusterState state = getClusterState();
List<String> nodes = StreamSupport.stream(state.nodes().getNodes().values().spliterator(), false)
.map(x -> x.value.getName())
.collect(Collectors.toList());
String shardId = "0";
for (String node : nodes) {
RemoteStoreStatsResponse response = client(node).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
assertTrue(response.getSuccessfulShards() > 0);
assertTrue(response.getShards() != null && response.getShards().length != 0);
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.totalUploadsSucceeded > 0);
assertEquals(0, stats.rejectionCount);
assertEquals(0, stats.consecutiveFailuresCount);
assertEquals(0, stats.bytesLag);
assertTrue(stats.uploadBytesMovingAverage > 0);
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
}
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.PlainShardsIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -89,14 +90,17 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque
}
return new PlainShardsIterator(
newShardRoutings.stream()
.filter(shardRouting -> remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardRouting.shardId()) != null)
.filter(
shardRouting -> !request.local()
|| (shardRouting.currentNodeId() == null
|| shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId()))
)
.filter(ShardRouting::primary)
.filter(shardRouting -> indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteStoreEnabled())
.filter(
shardRouting -> Boolean.parseBoolean(
clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
)
)
.collect(Collectors.toList())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ void incrementRejectionCount() {

void incrementRejectionCount(String rejectionReason) {
rejectionCountMap.computeIfAbsent(rejectionReason, k -> new AtomicLong()).incrementAndGet();
incrementRejectionCount();
}

long getRejectionCount(String rejectionReason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2517,4 +2517,8 @@ protected String replicaNodeName(String indexName) {
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}

protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

}