Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add Remote Store backpressure rejection stats to _nodes/stats #10524

Merged
merged 13 commits into from
Oct 14, 2023
Prev Previous commit
Next Next commit
Update UTs
Signed-off-by: Bhumika Saini <[email protected]>
Bhumika Saini committed Oct 13, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 16ca31229fe814df4aa252fe301075a3608cc650
Original file line number Diff line number Diff line change
@@ -715,7 +715,6 @@ public static final IndexShard newIndexShard(
nodeId,
null,
null

);
}

Original file line number Diff line number Diff line change
@@ -135,6 +135,11 @@ public SegmentReplicationPressureService(
this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this);
}

/** Only used for testing **/
public SegmentReplicationStatsTracker getTracker() {
return tracker;
}

// visible for testing
AsyncFailStaleReplicaTask getFailStaleReplicaTask() {
return failStaleReplicaTask;
Original file line number Diff line number Diff line change
@@ -267,7 +267,8 @@ public long getRejectionCount() {
return rejectionCount.get();
}

void incrementRejectionCount() {
/** public only for testing **/
public void incrementRejectionCount() {
rejectionCount.incrementAndGet();
}

Original file line number Diff line number Diff line change
@@ -563,6 +563,11 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() {
return remoteStoreStatsTrackerFactory;
}

/** Only used for testing **/
public SegmentReplicationPressureService getSegmentReplicationPressureService() {
return segmentReplicationPressureService;
}

public String getNodeId() {
return translogConfig.getNodeId();
}
Original file line number Diff line number Diff line change
@@ -96,6 +96,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
@@ -1827,6 +1829,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException {
.getRemoteSegmentTransferTracker(shard.shardId);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = shard.getRemoteStoreStatsTrackerFactory()
.getRemoteTranslogTransferTracker(shard.shardId);
populateSampleReplicationStats(shard);
populateSampleRemoteSegmentStats(remoteSegmentTransferTracker);
populateSampleRemoteTranslogStats(remoteTranslogTransferTracker);
ShardStats shardStats = new ShardStats(
@@ -1841,9 +1844,28 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException {
assertRemoteSegmentStats(remoteSegmentTransferTracker, remoteSegmentStats);
RemoteTranslogStats remoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats();
assertRemoteTranslogStats(remoteTranslogTransferTracker, remoteTranslogStats);
ReplicationStats replicationStats = shardStats.getStats().getSegments().getReplicationStats();
assertReplicationStats(shard, replicationStats);
closeShards(shard);
}

private static void assertReplicationStats(IndexShard shard, ReplicationStats replicationStats) {
if (shard.isPrimaryMode()) {
assertEquals(5, replicationStats.totalRejections);
} else {
assertEquals(0, replicationStats.totalRejections);
}
}

private static void populateSampleReplicationStats(IndexShard shard) {
if (shard.isPrimaryMode()) {
SegmentReplicationStatsTracker tracker = shard.getSegmentReplicationPressureService().getTracker();
for (int i = 0; i < 5; i++) {
tracker.incrementRejectionCount(shard.shardId);
}
}
}

public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
// refresh on: finalize and end of recovery
@@ -4910,6 +4932,8 @@ private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker track
tracker.addUploadBytesStarted(30L);
tracker.addUploadBytesSucceeded(10L);
tracker.addUploadBytesFailed(10L);
tracker.incrementRejectionCount();
tracker.incrementRejectionCount();
}

private void populateSampleRemoteTranslogStats(RemoteTranslogTransferTracker tracker) {
@@ -4943,5 +4967,7 @@ private static void assertRemoteSegmentStats(
assertEquals(remoteSegmentTransferTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted());
assertEquals(remoteSegmentTransferTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(remoteSegmentTransferTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed());
assertTrue(remoteSegmentStats.getTotalRejections() > 0);
assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections());
}
}
Original file line number Diff line number Diff line change
@@ -37,12 +37,15 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
@@ -85,6 +88,7 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MapperTestUtils;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.IndexCache;
import org.opensearch.index.cache.query.DisabledQueryCache;
@@ -157,7 +161,6 @@
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.Assert;

import java.io.IOException;
import java.nio.file.Path;
@@ -179,10 +182,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.mockito.Mockito;

import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@@ -191,6 +190,8 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;

/**
* A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
@@ -643,6 +644,13 @@ protected IndexShard newShard(
);
Store remoteStore;
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null;
SegmentReplicationPressureService segmentReplicationPressureService = new SegmentReplicationPressureService(
Settings.EMPTY,
clusterService,
mock(IndicesService.class),
mock(ShardStateAction.class),
mock(ThreadPool.class)
);
RepositoriesService mockRepoSvc = mock(RepositoriesService.class);

if (indexSettings.isRemoteStoreEnabled()) {
@@ -703,7 +711,7 @@ protected IndexShard newShard(
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
"dummy-node",
DefaultRecoverySettings.INSTANCE,
null
segmentReplicationPressureService
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
if (remoteStoreStatsTrackerFactory != null) {