diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 36569d10c50f6..e9626e6ecc0bd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -23,6 +23,7 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; +import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -143,10 +144,14 @@ protected void waitForSegmentReplication(String node) throws Exception { SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) .execute() .actionGet(); + final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats() + .get(INDEX_NAME) + .get(0); assertEquals( - segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(), SegmentReplicationState.Stage.DONE ); }, 1, TimeUnit.MINUTES); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a7506f6205409..51c0c8710d39d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -750,7 +750,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } - public void testSegmentReplicationStats() throws Exception { + public void testPressureServiceStats() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); final String replicaNode = internalCluster().startNode(); @@ -784,7 +784,6 @@ public void testSegmentReplicationStats() throws Exception { SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId()); Set replicaStats = groupStats.getReplicaStats(); assertEquals(1, replicaStats.size()); - assertEquals(replica.routingEntry().currentNodeId(), replicaStats.stream().findFirst().get().getNodeId()); // assert replica node returns nothing. SegmentReplicationPressureService replicaNode_service = internalCluster().getInstance( @@ -816,7 +815,6 @@ public void testSegmentReplicationStats() throws Exception { assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats(); assertEquals(1, replicaStats.size()); - assertEquals(replica.routingEntry().currentNodeId(), replicaStats.stream().findFirst().get().getNodeId()); // test a checkpoint without any new segments flush(INDEX_NAME); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 3f316524556ee..78afb72f1ee3e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -534,7 +534,7 @@ public void testFlushAfterRelocation() throws Exception { .prepareSegmentReplicationStats(INDEX_NAME) .execute() .actionGet(); - assertFalse(segmentReplicationStatsResponse.hasSegmentReplicationStats()); + assertTrue(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0).getReplicaStats().isEmpty()); // Relocate primary to new primary. When new primary starts it does perform a flush. logger.info("--> relocate the shard from primary to newPrimary"); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index b527347c786d5..921c9e729cefe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -11,11 +11,18 @@ import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -53,18 +60,20 @@ public void testSegmentReplicationStatsResponse() throws Exception { SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) .execute() .actionGet(); - assertEquals(segmentReplicationStatsResponse.shardSegmentReplicationStates().size(), 1); + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); + final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState(); + assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 2); assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 2); - assertEquals( - segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), - SegmentReplicationState.Stage.DONE - ); - assertTrue( - segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0 - ); + assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE); + assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0); }, 1, TimeUnit.MINUTES); } @@ -120,27 +129,250 @@ public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throw .indices() .prepareSegmentReplicationStats(INDEX_NAME) .setActiveOnly(true) + .setDetailed(true) .execute() .actionGet(); - assertEquals( - activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), - SegmentReplicationState.Stage.GET_FILES - ); + SegmentReplicationPerGroupStats perGroupStats = activeOnlyResponse.getReplicationStats().get(INDEX_NAME).get(0); + SegmentReplicationState.Stage stage = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState() + .getStage(); + assertEquals(SegmentReplicationState.Stage.GET_FILES, stage); // verifying completed_only by checking if current stage is DONE SegmentReplicationStatsResponse completedOnlyResponse = client().admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) .setCompletedOnly(true) .execute() .actionGet(); - assertEquals(completedOnlyResponse.shardSegmentReplicationStates().size(), SHARD_COUNT); - assertEquals( - completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), - SegmentReplicationState.Stage.DONE - ); - assertTrue(completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0); + assertEquals(completedOnlyResponse.getReplicationStats().size(), SHARD_COUNT); + perGroupStats = completedOnlyResponse.getReplicationStats().get(INDEX_NAME).get(0); + final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState(); + + assertEquals(SegmentReplicationState.Stage.DONE, currentReplicationState.getStage()); + assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0); waitForAssertions.countDown(); } + public void testNonDetailedResponse() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + int numReplicas = 4; + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas) + .build() + ); + ensureYellow(INDEX_NAME); + for (int i = 0; i < numReplicas; i++) { + nodes.add(internalCluster().startNode()); + } + ensureGreen(INDEX_NAME); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + waitForSearchableDocs(numDocs, nodes); + + final IndexShard indexShard = getIndexShard(primaryNode, INDEX_NAME); + + assertBusy(() -> { + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .execute() + .actionGet(); + + final Map> replicationStats = segmentReplicationStatsResponse + .getReplicationStats(); + assertEquals(1, replicationStats.size()); + final List replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(1, replicationPerGroupStats.size()); + final SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0); + assertEquals(perGroupStats.getShardId(), indexShard.shardId()); + final Set replicaStats = perGroupStats.getReplicaStats(); + assertEquals(4, replicaStats.size()); + for (SegmentReplicationShardStats replica : replicaStats) { + assertNotNull(replica.getCurrentReplicationState()); + } + }); + } + + public void testGetSpecificShard() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build() + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + nodes.add(internalCluster().startNode()); + ensureGreen(INDEX_NAME); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + waitForSearchableDocs(numDocs, nodes); + + final IndexShard indexShard = getIndexShard(primaryNode, INDEX_NAME); + + // search for all + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setActiveOnly(true) + .execute() + .actionGet(); + + Map> replicationStats = segmentReplicationStatsResponse.getReplicationStats(); + assertEquals(1, replicationStats.size()); + List replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(2, replicationPerGroupStats.size()); + for (SegmentReplicationPerGroupStats group : replicationPerGroupStats) { + assertEquals(1, group.getReplicaStats().size()); + } + + // now search for one shard. + final int id = indexShard.shardId().getId(); + segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setActiveOnly(true) + .shards(String.valueOf(id)) + .execute() + .actionGet(); + + replicationStats = segmentReplicationStatsResponse.getReplicationStats(); + assertEquals(1, replicationStats.size()); + replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(1, replicationPerGroupStats.size()); + for (SegmentReplicationPerGroupStats group : replicationPerGroupStats) { + assertEquals(group.getShardId(), indexShard.shardId()); + assertEquals(1, group.getReplicaStats().size()); + } + + } + + public void testMultipleIndices() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String index_2 = "tst-index-2"; + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex(INDEX_NAME, index_2); + + ensureYellowAndNoInitializingShards(INDEX_NAME, index_2); + nodes.add(internalCluster().startNode()); + ensureGreen(INDEX_NAME, index_2); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + index(index_2, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME, index_2); + waitForSearchableDocs(INDEX_NAME, numDocs, nodes); + waitForSearchableDocs(index_2, numDocs, nodes); + + final IndexShard index_1_primary = getIndexShard(primaryNode, INDEX_NAME); + final IndexShard index_2_primary = getIndexShard(primaryNode, index_2); + + assertTrue(index_1_primary.routingEntry().primary()); + assertTrue(index_2_primary.routingEntry().primary()); + + // test both indices are returned in the response. + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats() + .execute() + .actionGet(); + + Map> replicationStats = segmentReplicationStatsResponse.getReplicationStats(); + assertEquals(2, replicationStats.size()); + List replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(1, replicationPerGroupStats.size()); + SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0); + assertEquals(perGroupStats.getShardId(), index_1_primary.shardId()); + Set replicaStats = perGroupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + for (SegmentReplicationShardStats replica : replicaStats) { + assertNotNull(replica.getCurrentReplicationState()); + } + + replicationPerGroupStats = replicationStats.get(index_2); + assertEquals(1, replicationPerGroupStats.size()); + perGroupStats = replicationPerGroupStats.get(0); + assertEquals(perGroupStats.getShardId(), index_2_primary.shardId()); + replicaStats = perGroupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + for (SegmentReplicationShardStats replica : replicaStats) { + assertNotNull(replica.getCurrentReplicationState()); + } + + // test only single index queried. + segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats() + .setIndices(index_2) + .execute() + .actionGet(); + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); + assertTrue(segmentReplicationStatsResponse.getReplicationStats().containsKey(index_2)); + } + + public void testQueryAgainstDocRepIndex() { + internalCluster().startClusterManagerOnlyNode(); + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + .build() + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + nodes.add(internalCluster().startNode()); + ensureGreen(INDEX_NAME); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + + // search for all + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .execute() + .actionGet(); + assertTrue(segmentReplicationStatsResponse.getReplicationStats().isEmpty()); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java new file mode 100644 index 0000000000000..b6855507669b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.indices.replication.SegmentReplicationState; + +import java.io.IOException; + +/** + * Segment Replication specific response object for fetching stats from either a primary + * or replica shard. The stats returned are different depending on primary or replica. + * + * @opensearch.internal + */ +public class SegmentReplicationShardStatsResponse implements Writeable { + + @Nullable + private final SegmentReplicationPerGroupStats primaryStats; + + @Nullable + private final SegmentReplicationState replicaStats; + + public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException { + this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new); + this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new); + } + + public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) { + this.primaryStats = primaryStats; + this.replicaStats = null; + } + + public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) { + this.replicaStats = replicaStats; + this.primaryStats = null; + } + + public SegmentReplicationPerGroupStats getPrimaryStats() { + return primaryStats; + } + + public SegmentReplicationState getReplicaStats() { + return replicaStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(primaryStats); + out.writeOptionalWriteable(replicaStats); + } + + @Override + public String toString() { + return "SegmentReplicationShardStatsResponse{" + "primaryStats=" + primaryStats + ", replicaStats=" + replicaStats + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java index 2f72d7dd3e544..a72455be3713a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java @@ -15,7 +15,7 @@ import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.index.SegmentReplicationPerGroupStats; import java.io.IOException; import java.util.List; @@ -27,54 +27,50 @@ * @opensearch.internal */ public class SegmentReplicationStatsResponse extends BroadcastResponse { - private final Map> shardSegmentReplicationStates; + private final Map> replicationStats; public SegmentReplicationStatsResponse(StreamInput in) throws IOException { super(in); - shardSegmentReplicationStates = in.readMapOfLists(StreamInput::readString, SegmentReplicationState::new); + replicationStats = in.readMapOfLists(StreamInput::readString, SegmentReplicationPerGroupStats::new); } /** * Constructs segment replication stats information for a collection of indices and associated shards. Keeps track of how many total shards * were seen, and out of those how many were successfully processed and how many failed. * - * @param totalShards Total count of shards seen - * @param successfulShards Count of shards successfully processed - * @param failedShards Count of shards which failed to process - * @param shardSegmentReplicationStates Map of indices to shard replication information - * @param shardFailures List of failures processing shards + * @param totalShards Total count of shards seen + * @param successfulShards Count of shards successfully processed + * @param failedShards Count of shards which failed to process + * @param replicationStats Map of indices to a list of {@link SegmentReplicationPerGroupStats} + * @param shardFailures List of failures processing shards */ public SegmentReplicationStatsResponse( int totalShards, int successfulShards, int failedShards, - Map> shardSegmentReplicationStates, + Map> replicationStats, List shardFailures ) { super(totalShards, successfulShards, failedShards, shardFailures); - this.shardSegmentReplicationStates = shardSegmentReplicationStates; + this.replicationStats = replicationStats; } - public boolean hasSegmentReplicationStats() { - return shardSegmentReplicationStates.size() > 0; - } - - public Map> shardSegmentReplicationStates() { - return shardSegmentReplicationStates; + public Map> getReplicationStats() { + return replicationStats; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (hasSegmentReplicationStats()) { - for (String index : shardSegmentReplicationStates.keySet()) { - List segmentReplicationStates = shardSegmentReplicationStates.get(index); + if (replicationStats.size() > 0) { + for (String index : replicationStats.keySet()) { + List segmentReplicationStates = replicationStats.get(index); if (segmentReplicationStates == null || segmentReplicationStates.size() == 0) { continue; } builder.startObject(index); - builder.startArray("shards"); - for (SegmentReplicationState segmentReplicationState : segmentReplicationStates) { + builder.startArray("primary_stats"); + for (SegmentReplicationPerGroupStats segmentReplicationState : segmentReplicationStates) { builder.startObject(); segmentReplicationState.toXContent(builder, params); builder.endObject(); @@ -90,7 +86,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeMapOfLists(shardSegmentReplicationStates, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + out.writeMapOfLists(replicationStats, StreamOutput::writeString, (o, v) -> v.writeTo(o)); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index 67849002ab811..a94a4a0a8db04 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -21,6 +21,9 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.index.IndexService; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -30,12 +33,12 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Arrays; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; +import java.util.stream.Collectors; /** * Transport action for shard segment replication operation. This transport action does not actually @@ -46,10 +49,11 @@ public class TransportSegmentReplicationStatsAction extends TransportBroadcastByNodeAction< SegmentReplicationStatsRequest, SegmentReplicationStatsResponse, - SegmentReplicationState> { + SegmentReplicationShardStatsResponse> { private final SegmentReplicationTargetService targetService; private final IndicesService indicesService; + private final SegmentReplicationPressureService pressureService; @Inject public TransportSegmentReplicationStatsAction( @@ -58,7 +62,8 @@ public TransportSegmentReplicationStatsAction( IndicesService indicesService, SegmentReplicationTargetService targetService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + SegmentReplicationPressureService pressureService ) { super( SegmentReplicationStatsAction.NAME, @@ -71,11 +76,12 @@ public TransportSegmentReplicationStatsAction( ); this.indicesService = indicesService; this.targetService = targetService; + this.pressureService = pressureService; } @Override - protected SegmentReplicationState readShardResult(StreamInput in) throws IOException { - return new SegmentReplicationState(in); + protected SegmentReplicationShardStatsResponse readShardResult(StreamInput in) throws IOException { + return new SegmentReplicationShardStatsResponse(in); } @Override @@ -84,35 +90,51 @@ protected SegmentReplicationStatsResponse newResponse( int totalShards, int successfulShards, int failedShards, - List responses, + List responses, List shardFailures, ClusterState clusterState ) { String[] shards = request.shards(); - Set set = new HashSet<>(); - if (shards.length > 0) { - for (String shard : shards) { - set.add(shard); + final List shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList()); + + // organize replica responses by allocationId. + final Map replicaStats = new HashMap<>(); + // map of index name to list of replication group stats. + final Map> primaryStats = new HashMap<>(); + for (SegmentReplicationShardStatsResponse response : responses) { + if (response != null) { + if (response.getReplicaStats() != null) { + final ShardRouting shardRouting = response.getReplicaStats().getShardRouting(); + if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardRouting.shardId().getId())) { + replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats()); + } + } + if (response.getPrimaryStats() != null) { + final ShardId shardId = response.getPrimaryStats().getShardId(); + if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) { + primaryStats.compute(shardId.getIndexName(), (k, v) -> { + if (v == null) { + final ArrayList list = new ArrayList<>(); + list.add(response.getPrimaryStats()); + return list; + } else { + v.add(response.getPrimaryStats()); + return v; + } + }); + } + } } } - Map> shardResponses = new HashMap<>(); - for (SegmentReplicationState segmentReplicationState : responses) { - if (segmentReplicationState == null) { - continue; + // combine the replica stats to the shard stat entry in each group. + for (Map.Entry> entry : primaryStats.entrySet()) { + for (SegmentReplicationPerGroupStats group : entry.getValue()) { + for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) { + replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)); + } } - - // Limit responses to only specific shard id's passed in query paramter shards. - int shardId = segmentReplicationState.getShardRouting().shardId().id(); - if (shards.length > 0 && set.contains(Integer.toString(shardId)) == false) { - continue; - } - String indexName = segmentReplicationState.getShardRouting().getIndexName(); - if (!shardResponses.containsKey(indexName)) { - shardResponses.put(indexName, new ArrayList<>()); - } - shardResponses.get(indexName).add(segmentReplicationState); } - return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, shardResponses, shardFailures); + return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures); } @Override @@ -121,25 +143,29 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws } @Override - protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) { + protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) { IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); ShardId shardId = shardRouting.shardId(); - if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) { + if (indexShard.indexSettings().isSegRepEnabled() == false) { return null; } + if (shardRouting.primary()) { + return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard)); + } + // return information about only on-going segment replication events. if (request.activeOnly()) { - return targetService.getOngoingEventSegmentReplicationState(shardId); + return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId)); } // return information about only latest completed segment replication events. if (request.completedOnly()) { - return targetService.getlatestCompletedEventSegmentReplicationState(shardId); + return new SegmentReplicationShardStatsResponse(targetService.getlatestCompletedEventSegmentReplicationState(shardId)); } - return targetService.getSegmentReplicationState(shardId); + return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId)); } @Override diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java index 664ac89ff57d6..23beef419e7b6 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -13,6 +13,7 @@ import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; import java.io.IOException; import java.util.Set; @@ -24,15 +25,18 @@ */ public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment { + private final ShardId shardId; private final Set replicaStats; private final long rejectedRequestCount; - public SegmentReplicationPerGroupStats(Set replicaStats, long rejectedRequestCount) { + public SegmentReplicationPerGroupStats(ShardId shardId, Set replicaStats, long rejectedRequestCount) { + this.shardId = shardId; this.replicaStats = replicaStats; this.rejectedRequestCount = rejectedRequestCount; } public SegmentReplicationPerGroupStats(StreamInput in) throws IOException { + this.shardId = new ShardId(in); this.replicaStats = in.readSet(SegmentReplicationShardStats::new); this.rejectedRequestCount = in.readVLong(); } @@ -45,6 +49,10 @@ public long getRejectedRequestCount() { return rejectedRequestCount; } + public ShardId getShardId() { + return shardId; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("rejected_requests", rejectedRequestCount); @@ -58,6 +66,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); out.writeCollection(replicaStats); out.writeVLong(rejectedRequestCount); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 874f35daf158f..f31e236fb6184 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -131,6 +131,10 @@ public SegmentReplicationStats nodeStats() { return tracker.getStats(); } + public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { + return tracker.getStatsForShard(indexShard); + } + public boolean isSegmentReplicationBackpressureEnabled() { return isSegmentReplicationBackpressureEnabled; } @@ -150,5 +154,4 @@ public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) { public void setMaxReplicationTime(TimeValue maxReplicationTime) { this.maxReplicationTime = maxReplicationTime; } - } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java index 8140240110de2..b68c49453222b 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java @@ -8,6 +8,7 @@ package org.opensearch.index; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -15,6 +16,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.indices.replication.SegmentReplicationState; import java.io.IOException; @@ -24,20 +26,23 @@ * @opensearch.internal */ public class SegmentReplicationShardStats implements Writeable, ToXContentFragment { - private final String nodeId; + private final String allocationId; private final long checkpointsBehindCount; private final long bytesBehindCount; private final long currentReplicationTimeMillis; private final long lastCompletedReplicationTimeMillis; + @Nullable + private SegmentReplicationState currentReplicationState; + public SegmentReplicationShardStats( - String nodeId, + String allocationId, long checkpointsBehindCount, long bytesBehindCount, long currentReplicationTimeMillis, long lastCompletedReplicationTime ) { - this.nodeId = nodeId; + this.allocationId = allocationId; this.checkpointsBehindCount = checkpointsBehindCount; this.bytesBehindCount = bytesBehindCount; this.currentReplicationTimeMillis = currentReplicationTimeMillis; @@ -45,15 +50,15 @@ public SegmentReplicationShardStats( } public SegmentReplicationShardStats(StreamInput in) throws IOException { - this.nodeId = in.readString(); + this.allocationId = in.readString(); this.checkpointsBehindCount = in.readVLong(); this.bytesBehindCount = in.readVLong(); this.currentReplicationTimeMillis = in.readVLong(); this.lastCompletedReplicationTimeMillis = in.readVLong(); } - public String getNodeId() { - return nodeId; + public String getAllocationId() { + return allocationId; } public long getCheckpointsBehindCount() { @@ -72,21 +77,35 @@ public long getLastCompletedReplicationTimeMillis() { return lastCompletedReplicationTimeMillis; } + public void setCurrentReplicationState(SegmentReplicationState currentReplicationState) { + this.currentReplicationState = currentReplicationState; + } + + @Nullable + public SegmentReplicationState getCurrentReplicationState() { + return currentReplicationState; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("node_id", nodeId); + builder.field("allocation_id", allocationId); builder.field("checkpoints_behind", checkpointsBehindCount); builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString()); builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis)); builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis)); + if (currentReplicationState != null) { + builder.startObject(); + currentReplicationState.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(nodeId); + out.writeString(allocationId); out.writeVLong(checkpointsBehindCount); out.writeVLong(bytesBehindCount); out.writeVLong(currentReplicationTimeMillis); @@ -96,17 +115,18 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { return "SegmentReplicationShardStats{" - + "nodeId='" - + nodeId - + '\'' + + "allocationId=" + + allocationId + ", checkpointsBehindCount=" + checkpointsBehindCount + ", bytesBehindCount=" + bytesBehindCount - + ", currentReplicationLag=" + + ", currentReplicationTimeMillis=" + currentReplicationTimeMillis - + ", lastCompletedLag=" + + ", lastCompletedReplicationTimeMillis=" + lastCompletedReplicationTimeMillis + + ", currentReplicationState=" + + currentReplicationState + '}'; } } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java index f4629b46d9e02..10975e48443d8 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -28,7 +28,7 @@ public class SegmentReplicationStats implements Writeable, ToXContentFragment { private final Map shardStats; - public SegmentReplicationStats(Map shardStats) { + public SegmentReplicationStats(final Map shardStats) { this.shardStats = shardStats; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 201bdc97e2466..d7176127615d5 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -26,7 +26,7 @@ public class SegmentReplicationStatsTracker { private final IndicesService indicesService; - private Map rejectionCount; + private final Map rejectionCount; public SegmentReplicationStatsTracker(IndicesService indicesService) { this.indicesService = indicesService; @@ -38,13 +38,7 @@ public SegmentReplicationStats getStats() { for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { - stats.putIfAbsent( - indexShard.shardId(), - new SegmentReplicationPerGroupStats( - indexShard.getReplicationStats(), - Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) - ) - ); + stats.putIfAbsent(indexShard.shardId(), getStatsForShard(indexShard)); } } } @@ -61,4 +55,12 @@ public void incrementRejectionCount(ShardId shardId) { } }); } + + public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { + return new SegmentReplicationPerGroupStats( + indexShard.shardId(), + indexShard.getReplicationStats(), + Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) + ); + } } diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 4715d1d4ddd5b..6edf008fc35b3 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -71,7 +71,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -1263,7 +1262,7 @@ private SegmentReplicationShardStats buildShardStats( ) { final Map checkpointTimers = checkpointState.checkpointTimers; return new SegmentReplicationShardStats( - Optional.ofNullable(this.routingTable.getByAllocationId(allocationId)).map(ShardRouting::currentNodeId).orElse("not assigned"), + allocationId, checkpointTimers.size(), checkpointState.visibleReplicationCheckpoint == null ? latestCheckpointLength diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java index 15070c723c2c8..08e7b89e68508 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java @@ -15,8 +15,11 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.Strings; import org.opensearch.common.Table; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentOpenSearchExtension; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestHandler; @@ -24,9 +27,10 @@ import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; -import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; @@ -72,7 +76,7 @@ public BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest reques return channel -> client.admin() .indices() - .segmentReplicationStats(segmentReplicationStatsRequest, new RestResponseListener(channel) { + .segmentReplicationStats(segmentReplicationStatsRequest, new RestResponseListener<>(channel) { @Override public RestResponse buildResponse(final SegmentReplicationStatsResponse response) throws Exception { return RestTable.buildResponse(buildSegmentReplicationTable(request, response), channel); @@ -90,22 +94,23 @@ protected Table getTableWithHeader(RestRequest request) { Table t = new Table(); t.startHeaders() - .addCell("index", "alias:i,idx;desc:index name") .addCell("shardId", "alias:s;desc: shard Id") - .addCell("time", "alias:t,ti;desc:segment replication time") - .addCell("stage", "alias:st;desc:segment replication stage") - .addCell("source_description", "alias:sdesc;desc:source description") - .addCell("target_host", "alias:thost;desc:target host") .addCell("target_node", "alias:tnode;desc:target node name") - .addCell("files_fetched", "alias:ff;desc:files fetched") - .addCell("files_percent", "alias:fp;desc:percent of files fetched") - .addCell("bytes_fetched", "alias:bf;desc:bytes fetched") - .addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched"); + .addCell("target_host", "alias:thost;desc:target host") + .addCell("checkpoints_behind", "alias:cpb;desc:checkpoints behind primary") + .addCell("bytes_behind", "alias:bb;desc:bytes behind primary") + .addCell("current_lag", "alias:clag;desc:ongoing time elapsed waiting for replica to catch up to primary") + .addCell("last_completed_lag", "alias:lcl;desc:time taken for replica to catch up to latest primary refresh") + .addCell("rejected_requests", "alias:rr;desc:count of rejected requests for the replication group"); if (detailed) { - t.addCell("start_time", "alias:start;desc:segment replication start time") - .addCell("start_time_millis", "alias:start_millis;desc:segment replication start time in epoch milliseconds") + t.addCell("stage", "alias:st;desc:segment replication event stage") + .addCell("time", "alias:t,ti;desc:current replication event time") + .addCell("files_fetched", "alias:ff;desc:files fetched") + .addCell("files_percent", "alias:fp;desc:percent of files fetched") + .addCell("bytes_fetched", "alias:bf;desc:bytes fetched") + .addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched") + .addCell("start_time", "alias:start;desc:segment replication start time") .addCell("stop_time", "alias:stop;desc:segment replication stop time") - .addCell("stop_time_millis", "alias:stop_millis;desc:segment replication stop time in epoch milliseconds") .addCell("files", "alias:f;desc:number of files to fetch") .addCell("files_total", "alias:tf;desc:total number of files") .addCell("bytes", "alias:b;desc:number of bytes to fetch") @@ -135,58 +140,61 @@ public Table buildSegmentReplicationTable(RestRequest request, SegmentReplicatio } Table t = getTableWithHeader(request); - for (String index : response.shardSegmentReplicationStates().keySet()) { + for (Map.Entry> entry : response.getReplicationStats().entrySet()) { + final List replicationPerGroupStats = entry.getValue(); - List shardSegmentReplicationStates = response.shardSegmentReplicationStates().get(index); - if (shardSegmentReplicationStates.size() == 0) { + if (replicationPerGroupStats.isEmpty()) { continue; } // Sort ascending by shard id for readability - CollectionUtil.introSort(shardSegmentReplicationStates, new Comparator() { - @Override - public int compare(SegmentReplicationState o1, SegmentReplicationState o2) { - int id1 = o1.getShardRouting().shardId().id(); - int id2 = o2.getShardRouting().shardId().id(); - if (id1 < id2) { - return -1; - } else if (id1 > id2) { - return 1; - } else { - return 0; - } - } + CollectionUtil.introSort(replicationPerGroupStats, (o1, o2) -> { + int id1 = o1.getShardId().id(); + int id2 = o2.getShardId().id(); + return Integer.compare(id1, id2); }); - for (SegmentReplicationState state : shardSegmentReplicationStates) { - t.startRow(); - t.addCell(index); - t.addCell(state.getShardRouting().shardId().id()); - t.addCell(new TimeValue(state.getTimer().time())); - t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); - t.addCell(state.getSourceDescription()); - t.addCell(state.getTargetNode().getHostName()); - t.addCell(state.getTargetNode().getName()); - t.addCell(state.getIndex().recoveredFileCount()); - t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredFilesPercent())); - t.addCell(state.getIndex().recoveredBytes()); - t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent())); - if (detailed) { - t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime())); - t.addCell(state.getTimer().startTime()); - t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime())); - t.addCell(state.getTimer().stopTime()); - t.addCell(state.getIndex().totalRecoverFiles()); - t.addCell(state.getIndex().totalFileCount()); - t.addCell(state.getIndex().totalRecoverBytes()); - t.addCell(state.getIndex().totalBytes()); - t.addCell(state.getReplicatingStageTime()); - t.addCell(state.getGetCheckpointInfoStageTime()); - t.addCell(state.getFileDiffStageTime()); - t.addCell(state.getGetFileStageTime()); - t.addCell(state.getFinalizeReplicationStageTime()); + for (SegmentReplicationPerGroupStats perGroupStats : replicationPerGroupStats) { + + final Set replicaShardStats = perGroupStats.getReplicaStats(); + + for (SegmentReplicationShardStats shardStats : replicaShardStats) { + final SegmentReplicationState state = shardStats.getCurrentReplicationState(); + if (state == null) { + continue; + } + + t.startRow(); + t.addCell(perGroupStats.getShardId()); + // these nulls should never happen, here for safety. + t.addCell(state.getTargetNode().getName()); + t.addCell(state.getTargetNode().getHostName()); + t.addCell(shardStats.getCheckpointsBehindCount()); + t.addCell(new ByteSizeValue(shardStats.getBytesBehindCount())); + t.addCell(new TimeValue(shardStats.getCurrentReplicationTimeMillis())); + t.addCell(new TimeValue(shardStats.getLastCompletedReplicationTimeMillis())); + t.addCell(perGroupStats.getRejectedRequestCount()); + if (detailed) { + t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); + t.addCell(new TimeValue(state.getTimer().time())); + t.addCell(state.getIndex().recoveredFileCount()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredFilesPercent())); + t.addCell(state.getIndex().recoveredBytes()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent())); + t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime())); + t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime())); + t.addCell(state.getIndex().totalRecoverFiles()); + t.addCell(state.getIndex().totalFileCount()); + t.addCell(new ByteSizeValue(state.getIndex().totalRecoverBytes())); + t.addCell(new ByteSizeValue(state.getIndex().totalBytes())); + t.addCell(state.getReplicatingStageTime()); + t.addCell(state.getGetCheckpointInfoStageTime()); + t.addCell(state.getFileDiffStageTime()); + t.addCell(state.getGetFileStageTime()); + t.addCell(state.getFinalizeReplicationStageTime()); + } + t.endRow(); } - t.endRow(); } } diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java index ec2b635d2d608..7a0d80d9538ad 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java @@ -11,43 +11,56 @@ import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.support.DefaultShardOperationFailedException; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Randomness; import org.opensearch.common.Table; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.Index; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.UUID; +import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RestCatSegmentReplicationActionTests extends OpenSearchTestCase { - public void testSegmentReplicationAction() { + public void testSegmentReplicationAction() throws IOException { final RestCatSegmentReplicationAction action = new RestCatSegmentReplicationAction(); final int totalShards = randomIntBetween(1, 32); final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2)); final int failedShards = totalShards - successfulShards; - final Map> shardSegmentReplicationStates = new HashMap<>(); - final List segmentReplicationStates = new ArrayList<>(); + final Map> shardSegmentReplicationStates = new HashMap<>(); + final List groupStats = new ArrayList<>(); + final long rejectedRequestCount = 5L; for (int i = 0; i < successfulShards; i++) { + final ShardId shardId = new ShardId(new Index("index", "_na_"), i); final SegmentReplicationState state = mock(SegmentReplicationState.class); final ShardRouting shardRouting = mock(ShardRouting.class); when(state.getShardRouting()).thenReturn(shardRouting); - when(shardRouting.shardId()).thenReturn(new ShardId(new Index("index", "_na_"), i)); + + when(shardRouting.shardId()).thenReturn(shardId); + final AllocationId aId = mock(AllocationId.class); + when(aId.getId()).thenReturn(UUID.randomUUID().toString()); + when(shardRouting.allocationId()).thenReturn(aId); when(state.getReplicationId()).thenReturn(randomLongBetween(0, 1000)); final ReplicationTimer timer = mock(ReplicationTimer.class); final long startTime = randomLongBetween(0, new Date().getTime()); @@ -60,19 +73,30 @@ public void testSegmentReplicationAction() { when(state.getSourceDescription()).thenReturn("Source"); final DiscoveryNode targetNode = mock(DiscoveryNode.class); when(targetNode.getHostName()).thenReturn(randomAlphaOfLength(8)); + when(targetNode.getName()).thenReturn(UUID.randomUUID().toString()); when(state.getTargetNode()).thenReturn(targetNode); ReplicationLuceneIndex index = createTestIndex(); when(state.getIndex()).thenReturn(index); - // - - segmentReplicationStates.add(state); + final SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats( + state.getShardRouting().allocationId().getId(), + 0L, + 0L, + 0L, + 0L + ); + segmentReplicationShardStats.setCurrentReplicationState(state); + final SegmentReplicationPerGroupStats perGroupStats = new SegmentReplicationPerGroupStats( + shardId, + Set.of(segmentReplicationShardStats), + rejectedRequestCount + ); + groupStats.add(perGroupStats); } - final List shuffle = new ArrayList<>(segmentReplicationStates); - Randomness.shuffle(shuffle); - shardSegmentReplicationStates.put("index", shuffle); + Randomness.shuffle(groupStats); + shardSegmentReplicationStates.put("index", groupStats); final List shardFailures = new ArrayList<>(); final SegmentReplicationStatsResponse response = new SegmentReplicationStatsResponse( @@ -88,18 +112,15 @@ public void testSegmentReplicationAction() { List headers = table.getHeaders(); - final List expectedHeaders = Arrays.asList( - "index", + final List expectedHeaders = asList( "shardId", - "time", - "stage", - "source_description", - "target_host", "target_node", - "files_fetched", - "files_percent", - "bytes_fetched", - "bytes_percent" + "target_host", + "checkpoints_behind", + "bytes_behind", + "current_lag", + "last_completed_lag", + "rejected_requests" ); for (int i = 0; i < expectedHeaders.size(); i++) { @@ -109,19 +130,20 @@ public void testSegmentReplicationAction() { assertThat(table.getRows().size(), equalTo(successfulShards)); for (int i = 0; i < successfulShards; i++) { - final SegmentReplicationState state = segmentReplicationStates.get(i); - final List expectedValues = Arrays.asList( - "index", - i, - new TimeValue(state.getTimer().time()), - state.getStage().name().toLowerCase(Locale.ROOT), - state.getSourceDescription(), - state.getTargetNode().getHostName(), - state.getTargetNode().getName(), - state.getIndex().recoveredFileCount(), - percent(state.getIndex().recoveredFilesPercent()), - state.getIndex().recoveredBytes(), - percent(state.getIndex().recoveredBytesPercent()) + final SegmentReplicationPerGroupStats perGroupStats = groupStats.get(i); + final Set replicaStats = perGroupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + final SegmentReplicationShardStats shardStats = replicaStats.stream().findFirst().get(); + final SegmentReplicationState currentReplicationState = shardStats.getCurrentReplicationState(); + final List expectedValues = asList( + perGroupStats.getShardId(), + currentReplicationState.getTargetNode().getName(), + currentReplicationState.getTargetNode().getHostName(), + shardStats.getCheckpointsBehindCount(), + new ByteSizeValue(shardStats.getBytesBehindCount()), + new TimeValue(shardStats.getCurrentReplicationTimeMillis()), + new TimeValue(shardStats.getLastCompletedReplicationTimeMillis()), + rejectedRequestCount ); final List cells = table.getRows().get(i);