From abe1ac813bee99203138ceb86e0f6463bbb91420 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 16 Mar 2023 11:09:39 -0700 Subject: [PATCH] Segment Replication - Update /_cat/segment_replication API with backpressure metrics. (#6674) * Segment Replication - Update Segment Replication API with backpressure metrics. This change updates the existing /_cat/segment_replication API to include backpressure metrics. It does this by returning stats from primary shards for its tracked replication group and merging it with metrics returned from replicas. Primary captured metrics will now appear by default, with replica per sync events showing when detailed=true is set. Signed-off-by: Marc Handalian * PR Feedback. Signed-off-by: Marc Handalian * Fixed current_lag header alias. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationBaseIT.java | 7 +- .../replication/SegmentReplicationIT.java | 4 +- .../SegmentReplicationRelocationIT.java | 2 +- .../SegmentReplicationStatsIT.java | 268 ++++++++++++++++-- .../SegmentReplicationShardStatsResponse.java | 67 +++++ .../SegmentReplicationStatsResponse.java | 40 ++- ...ransportSegmentReplicationStatsAction.java | 90 +++--- .../SegmentReplicationPerGroupStats.java | 11 +- .../SegmentReplicationPressureService.java | 5 +- .../index/SegmentReplicationShardStats.java | 46 ++- .../index/SegmentReplicationStats.java | 2 +- .../index/SegmentReplicationStatsTracker.java | 18 +- .../index/seqno/ReplicationTracker.java | 3 +- .../cat/RestCatSegmentReplicationAction.java | 124 ++++---- .../RestCatSegmentReplicationActionTests.java | 90 +++--- 15 files changed, 582 insertions(+), 195 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 36569d10c50f6..e9626e6ecc0bd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -23,6 +23,7 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; +import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -143,10 +144,14 @@ protected void waitForSegmentReplication(String node) throws Exception { SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) .execute() .actionGet(); + final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats() + .get(INDEX_NAME) + .get(0); assertEquals( - segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), + perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(), SegmentReplicationState.Stage.DONE ); }, 1, TimeUnit.MINUTES); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a7506f6205409..51c0c8710d39d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -750,7 +750,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } - public void testSegmentReplicationStats() throws Exception { + public void testPressureServiceStats() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); final String replicaNode = internalCluster().startNode(); @@ -784,7 +784,6 @@ public void testSegmentReplicationStats() throws Exception { SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId()); Set replicaStats = groupStats.getReplicaStats(); assertEquals(1, replicaStats.size()); - assertEquals(replica.routingEntry().currentNodeId(), replicaStats.stream().findFirst().get().getNodeId()); // assert replica node returns nothing. SegmentReplicationPressureService replicaNode_service = internalCluster().getInstance( @@ -816,7 +815,6 @@ public void testSegmentReplicationStats() throws Exception { assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats(); assertEquals(1, replicaStats.size()); - assertEquals(replica.routingEntry().currentNodeId(), replicaStats.stream().findFirst().get().getNodeId()); // test a checkpoint without any new segments flush(INDEX_NAME); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 3f316524556ee..78afb72f1ee3e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -534,7 +534,7 @@ public void testFlushAfterRelocation() throws Exception { .prepareSegmentReplicationStats(INDEX_NAME) .execute() .actionGet(); - assertFalse(segmentReplicationStatsResponse.hasSegmentReplicationStats()); + assertTrue(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0).getReplicaStats().isEmpty()); // Relocate primary to new primary. When new primary starts it does perform a flush. logger.info("--> relocate the shard from primary to newPrimary"); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index b527347c786d5..921c9e729cefe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -11,11 +11,18 @@ import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -53,18 +60,20 @@ public void testSegmentReplicationStatsResponse() throws Exception { SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) .execute() .actionGet(); - assertEquals(segmentReplicationStatsResponse.shardSegmentReplicationStates().size(), 1); + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); + final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState(); + assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 2); assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 2); - assertEquals( - segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), - SegmentReplicationState.Stage.DONE - ); - assertTrue( - segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0 - ); + assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE); + assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0); }, 1, TimeUnit.MINUTES); } @@ -120,27 +129,250 @@ public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throw .indices() .prepareSegmentReplicationStats(INDEX_NAME) .setActiveOnly(true) + .setDetailed(true) .execute() .actionGet(); - assertEquals( - activeOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), - SegmentReplicationState.Stage.GET_FILES - ); + SegmentReplicationPerGroupStats perGroupStats = activeOnlyResponse.getReplicationStats().get(INDEX_NAME).get(0); + SegmentReplicationState.Stage stage = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState() + .getStage(); + assertEquals(SegmentReplicationState.Stage.GET_FILES, stage); // verifying completed_only by checking if current stage is DONE SegmentReplicationStatsResponse completedOnlyResponse = client().admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) .setCompletedOnly(true) .execute() .actionGet(); - assertEquals(completedOnlyResponse.shardSegmentReplicationStates().size(), SHARD_COUNT); - assertEquals( - completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), - SegmentReplicationState.Stage.DONE - ); - assertTrue(completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0); + assertEquals(completedOnlyResponse.getReplicationStats().size(), SHARD_COUNT); + perGroupStats = completedOnlyResponse.getReplicationStats().get(INDEX_NAME).get(0); + final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState(); + + assertEquals(SegmentReplicationState.Stage.DONE, currentReplicationState.getStage()); + assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0); waitForAssertions.countDown(); } + public void testNonDetailedResponse() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + int numReplicas = 4; + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas) + .build() + ); + ensureYellow(INDEX_NAME); + for (int i = 0; i < numReplicas; i++) { + nodes.add(internalCluster().startNode()); + } + ensureGreen(INDEX_NAME); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + waitForSearchableDocs(numDocs, nodes); + + final IndexShard indexShard = getIndexShard(primaryNode, INDEX_NAME); + + assertBusy(() -> { + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .execute() + .actionGet(); + + final Map> replicationStats = segmentReplicationStatsResponse + .getReplicationStats(); + assertEquals(1, replicationStats.size()); + final List replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(1, replicationPerGroupStats.size()); + final SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0); + assertEquals(perGroupStats.getShardId(), indexShard.shardId()); + final Set replicaStats = perGroupStats.getReplicaStats(); + assertEquals(4, replicaStats.size()); + for (SegmentReplicationShardStats replica : replicaStats) { + assertNotNull(replica.getCurrentReplicationState()); + } + }); + } + + public void testGetSpecificShard() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build() + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + nodes.add(internalCluster().startNode()); + ensureGreen(INDEX_NAME); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + waitForSearchableDocs(numDocs, nodes); + + final IndexShard indexShard = getIndexShard(primaryNode, INDEX_NAME); + + // search for all + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setActiveOnly(true) + .execute() + .actionGet(); + + Map> replicationStats = segmentReplicationStatsResponse.getReplicationStats(); + assertEquals(1, replicationStats.size()); + List replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(2, replicationPerGroupStats.size()); + for (SegmentReplicationPerGroupStats group : replicationPerGroupStats) { + assertEquals(1, group.getReplicaStats().size()); + } + + // now search for one shard. + final int id = indexShard.shardId().getId(); + segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setActiveOnly(true) + .shards(String.valueOf(id)) + .execute() + .actionGet(); + + replicationStats = segmentReplicationStatsResponse.getReplicationStats(); + assertEquals(1, replicationStats.size()); + replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(1, replicationPerGroupStats.size()); + for (SegmentReplicationPerGroupStats group : replicationPerGroupStats) { + assertEquals(group.getShardId(), indexShard.shardId()); + assertEquals(1, group.getReplicaStats().size()); + } + + } + + public void testMultipleIndices() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String index_2 = "tst-index-2"; + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex(INDEX_NAME, index_2); + + ensureYellowAndNoInitializingShards(INDEX_NAME, index_2); + nodes.add(internalCluster().startNode()); + ensureGreen(INDEX_NAME, index_2); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + index(index_2, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME, index_2); + waitForSearchableDocs(INDEX_NAME, numDocs, nodes); + waitForSearchableDocs(index_2, numDocs, nodes); + + final IndexShard index_1_primary = getIndexShard(primaryNode, INDEX_NAME); + final IndexShard index_2_primary = getIndexShard(primaryNode, index_2); + + assertTrue(index_1_primary.routingEntry().primary()); + assertTrue(index_2_primary.routingEntry().primary()); + + // test both indices are returned in the response. + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats() + .execute() + .actionGet(); + + Map> replicationStats = segmentReplicationStatsResponse.getReplicationStats(); + assertEquals(2, replicationStats.size()); + List replicationPerGroupStats = replicationStats.get(INDEX_NAME); + assertEquals(1, replicationPerGroupStats.size()); + SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0); + assertEquals(perGroupStats.getShardId(), index_1_primary.shardId()); + Set replicaStats = perGroupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + for (SegmentReplicationShardStats replica : replicaStats) { + assertNotNull(replica.getCurrentReplicationState()); + } + + replicationPerGroupStats = replicationStats.get(index_2); + assertEquals(1, replicationPerGroupStats.size()); + perGroupStats = replicationPerGroupStats.get(0); + assertEquals(perGroupStats.getShardId(), index_2_primary.shardId()); + replicaStats = perGroupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + for (SegmentReplicationShardStats replica : replicaStats) { + assertNotNull(replica.getCurrentReplicationState()); + } + + // test only single index queried. + segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats() + .setIndices(index_2) + .execute() + .actionGet(); + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); + assertTrue(segmentReplicationStatsResponse.getReplicationStats().containsKey(index_2)); + } + + public void testQueryAgainstDocRepIndex() { + internalCluster().startClusterManagerOnlyNode(); + List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startNode(); + nodes.add(primaryNode); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + .build() + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + nodes.add(internalCluster().startNode()); + ensureGreen(INDEX_NAME); + + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + + // search for all + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .execute() + .actionGet(); + assertTrue(segmentReplicationStatsResponse.getReplicationStats().isEmpty()); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java new file mode 100644 index 0000000000000..b6855507669b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.replication; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.indices.replication.SegmentReplicationState; + +import java.io.IOException; + +/** + * Segment Replication specific response object for fetching stats from either a primary + * or replica shard. The stats returned are different depending on primary or replica. + * + * @opensearch.internal + */ +public class SegmentReplicationShardStatsResponse implements Writeable { + + @Nullable + private final SegmentReplicationPerGroupStats primaryStats; + + @Nullable + private final SegmentReplicationState replicaStats; + + public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException { + this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new); + this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new); + } + + public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) { + this.primaryStats = primaryStats; + this.replicaStats = null; + } + + public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) { + this.replicaStats = replicaStats; + this.primaryStats = null; + } + + public SegmentReplicationPerGroupStats getPrimaryStats() { + return primaryStats; + } + + public SegmentReplicationState getReplicaStats() { + return replicaStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(primaryStats); + out.writeOptionalWriteable(replicaStats); + } + + @Override + public String toString() { + return "SegmentReplicationShardStatsResponse{" + "primaryStats=" + primaryStats + ", replicaStats=" + replicaStats + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java index 2f72d7dd3e544..a72455be3713a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationStatsResponse.java @@ -15,7 +15,7 @@ import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.index.SegmentReplicationPerGroupStats; import java.io.IOException; import java.util.List; @@ -27,54 +27,50 @@ * @opensearch.internal */ public class SegmentReplicationStatsResponse extends BroadcastResponse { - private final Map> shardSegmentReplicationStates; + private final Map> replicationStats; public SegmentReplicationStatsResponse(StreamInput in) throws IOException { super(in); - shardSegmentReplicationStates = in.readMapOfLists(StreamInput::readString, SegmentReplicationState::new); + replicationStats = in.readMapOfLists(StreamInput::readString, SegmentReplicationPerGroupStats::new); } /** * Constructs segment replication stats information for a collection of indices and associated shards. Keeps track of how many total shards * were seen, and out of those how many were successfully processed and how many failed. * - * @param totalShards Total count of shards seen - * @param successfulShards Count of shards successfully processed - * @param failedShards Count of shards which failed to process - * @param shardSegmentReplicationStates Map of indices to shard replication information - * @param shardFailures List of failures processing shards + * @param totalShards Total count of shards seen + * @param successfulShards Count of shards successfully processed + * @param failedShards Count of shards which failed to process + * @param replicationStats Map of indices to a list of {@link SegmentReplicationPerGroupStats} + * @param shardFailures List of failures processing shards */ public SegmentReplicationStatsResponse( int totalShards, int successfulShards, int failedShards, - Map> shardSegmentReplicationStates, + Map> replicationStats, List shardFailures ) { super(totalShards, successfulShards, failedShards, shardFailures); - this.shardSegmentReplicationStates = shardSegmentReplicationStates; + this.replicationStats = replicationStats; } - public boolean hasSegmentReplicationStats() { - return shardSegmentReplicationStates.size() > 0; - } - - public Map> shardSegmentReplicationStates() { - return shardSegmentReplicationStates; + public Map> getReplicationStats() { + return replicationStats; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (hasSegmentReplicationStats()) { - for (String index : shardSegmentReplicationStates.keySet()) { - List segmentReplicationStates = shardSegmentReplicationStates.get(index); + if (replicationStats.size() > 0) { + for (String index : replicationStats.keySet()) { + List segmentReplicationStates = replicationStats.get(index); if (segmentReplicationStates == null || segmentReplicationStates.size() == 0) { continue; } builder.startObject(index); - builder.startArray("shards"); - for (SegmentReplicationState segmentReplicationState : segmentReplicationStates) { + builder.startArray("primary_stats"); + for (SegmentReplicationPerGroupStats segmentReplicationState : segmentReplicationStates) { builder.startObject(); segmentReplicationState.toXContent(builder, params); builder.endObject(); @@ -90,7 +86,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeMapOfLists(shardSegmentReplicationStates, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + out.writeMapOfLists(replicationStats, StreamOutput::writeString, (o, v) -> v.writeTo(o)); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index 67849002ab811..a94a4a0a8db04 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -21,6 +21,9 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.index.IndexService; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -30,12 +33,12 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Arrays; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; +import java.util.stream.Collectors; /** * Transport action for shard segment replication operation. This transport action does not actually @@ -46,10 +49,11 @@ public class TransportSegmentReplicationStatsAction extends TransportBroadcastByNodeAction< SegmentReplicationStatsRequest, SegmentReplicationStatsResponse, - SegmentReplicationState> { + SegmentReplicationShardStatsResponse> { private final SegmentReplicationTargetService targetService; private final IndicesService indicesService; + private final SegmentReplicationPressureService pressureService; @Inject public TransportSegmentReplicationStatsAction( @@ -58,7 +62,8 @@ public TransportSegmentReplicationStatsAction( IndicesService indicesService, SegmentReplicationTargetService targetService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + SegmentReplicationPressureService pressureService ) { super( SegmentReplicationStatsAction.NAME, @@ -71,11 +76,12 @@ public TransportSegmentReplicationStatsAction( ); this.indicesService = indicesService; this.targetService = targetService; + this.pressureService = pressureService; } @Override - protected SegmentReplicationState readShardResult(StreamInput in) throws IOException { - return new SegmentReplicationState(in); + protected SegmentReplicationShardStatsResponse readShardResult(StreamInput in) throws IOException { + return new SegmentReplicationShardStatsResponse(in); } @Override @@ -84,35 +90,51 @@ protected SegmentReplicationStatsResponse newResponse( int totalShards, int successfulShards, int failedShards, - List responses, + List responses, List shardFailures, ClusterState clusterState ) { String[] shards = request.shards(); - Set set = new HashSet<>(); - if (shards.length > 0) { - for (String shard : shards) { - set.add(shard); + final List shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList()); + + // organize replica responses by allocationId. + final Map replicaStats = new HashMap<>(); + // map of index name to list of replication group stats. + final Map> primaryStats = new HashMap<>(); + for (SegmentReplicationShardStatsResponse response : responses) { + if (response != null) { + if (response.getReplicaStats() != null) { + final ShardRouting shardRouting = response.getReplicaStats().getShardRouting(); + if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardRouting.shardId().getId())) { + replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats()); + } + } + if (response.getPrimaryStats() != null) { + final ShardId shardId = response.getPrimaryStats().getShardId(); + if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) { + primaryStats.compute(shardId.getIndexName(), (k, v) -> { + if (v == null) { + final ArrayList list = new ArrayList<>(); + list.add(response.getPrimaryStats()); + return list; + } else { + v.add(response.getPrimaryStats()); + return v; + } + }); + } + } } } - Map> shardResponses = new HashMap<>(); - for (SegmentReplicationState segmentReplicationState : responses) { - if (segmentReplicationState == null) { - continue; + // combine the replica stats to the shard stat entry in each group. + for (Map.Entry> entry : primaryStats.entrySet()) { + for (SegmentReplicationPerGroupStats group : entry.getValue()) { + for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) { + replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)); + } } - - // Limit responses to only specific shard id's passed in query paramter shards. - int shardId = segmentReplicationState.getShardRouting().shardId().id(); - if (shards.length > 0 && set.contains(Integer.toString(shardId)) == false) { - continue; - } - String indexName = segmentReplicationState.getShardRouting().getIndexName(); - if (!shardResponses.containsKey(indexName)) { - shardResponses.put(indexName, new ArrayList<>()); - } - shardResponses.get(indexName).add(segmentReplicationState); } - return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, shardResponses, shardFailures); + return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures); } @Override @@ -121,25 +143,29 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws } @Override - protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) { + protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) { IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); ShardId shardId = shardRouting.shardId(); - if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) { + if (indexShard.indexSettings().isSegRepEnabled() == false) { return null; } + if (shardRouting.primary()) { + return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard)); + } + // return information about only on-going segment replication events. if (request.activeOnly()) { - return targetService.getOngoingEventSegmentReplicationState(shardId); + return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId)); } // return information about only latest completed segment replication events. if (request.completedOnly()) { - return targetService.getlatestCompletedEventSegmentReplicationState(shardId); + return new SegmentReplicationShardStatsResponse(targetService.getlatestCompletedEventSegmentReplicationState(shardId)); } - return targetService.getSegmentReplicationState(shardId); + return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId)); } @Override diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java index 664ac89ff57d6..23beef419e7b6 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -13,6 +13,7 @@ import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; import java.io.IOException; import java.util.Set; @@ -24,15 +25,18 @@ */ public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment { + private final ShardId shardId; private final Set replicaStats; private final long rejectedRequestCount; - public SegmentReplicationPerGroupStats(Set replicaStats, long rejectedRequestCount) { + public SegmentReplicationPerGroupStats(ShardId shardId, Set replicaStats, long rejectedRequestCount) { + this.shardId = shardId; this.replicaStats = replicaStats; this.rejectedRequestCount = rejectedRequestCount; } public SegmentReplicationPerGroupStats(StreamInput in) throws IOException { + this.shardId = new ShardId(in); this.replicaStats = in.readSet(SegmentReplicationShardStats::new); this.rejectedRequestCount = in.readVLong(); } @@ -45,6 +49,10 @@ public long getRejectedRequestCount() { return rejectedRequestCount; } + public ShardId getShardId() { + return shardId; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("rejected_requests", rejectedRequestCount); @@ -58,6 +66,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); out.writeCollection(replicaStats); out.writeVLong(rejectedRequestCount); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 874f35daf158f..f31e236fb6184 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -131,6 +131,10 @@ public SegmentReplicationStats nodeStats() { return tracker.getStats(); } + public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { + return tracker.getStatsForShard(indexShard); + } + public boolean isSegmentReplicationBackpressureEnabled() { return isSegmentReplicationBackpressureEnabled; } @@ -150,5 +154,4 @@ public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) { public void setMaxReplicationTime(TimeValue maxReplicationTime) { this.maxReplicationTime = maxReplicationTime; } - } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java index 8140240110de2..b68c49453222b 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java @@ -8,6 +8,7 @@ package org.opensearch.index; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -15,6 +16,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.indices.replication.SegmentReplicationState; import java.io.IOException; @@ -24,20 +26,23 @@ * @opensearch.internal */ public class SegmentReplicationShardStats implements Writeable, ToXContentFragment { - private final String nodeId; + private final String allocationId; private final long checkpointsBehindCount; private final long bytesBehindCount; private final long currentReplicationTimeMillis; private final long lastCompletedReplicationTimeMillis; + @Nullable + private SegmentReplicationState currentReplicationState; + public SegmentReplicationShardStats( - String nodeId, + String allocationId, long checkpointsBehindCount, long bytesBehindCount, long currentReplicationTimeMillis, long lastCompletedReplicationTime ) { - this.nodeId = nodeId; + this.allocationId = allocationId; this.checkpointsBehindCount = checkpointsBehindCount; this.bytesBehindCount = bytesBehindCount; this.currentReplicationTimeMillis = currentReplicationTimeMillis; @@ -45,15 +50,15 @@ public SegmentReplicationShardStats( } public SegmentReplicationShardStats(StreamInput in) throws IOException { - this.nodeId = in.readString(); + this.allocationId = in.readString(); this.checkpointsBehindCount = in.readVLong(); this.bytesBehindCount = in.readVLong(); this.currentReplicationTimeMillis = in.readVLong(); this.lastCompletedReplicationTimeMillis = in.readVLong(); } - public String getNodeId() { - return nodeId; + public String getAllocationId() { + return allocationId; } public long getCheckpointsBehindCount() { @@ -72,21 +77,35 @@ public long getLastCompletedReplicationTimeMillis() { return lastCompletedReplicationTimeMillis; } + public void setCurrentReplicationState(SegmentReplicationState currentReplicationState) { + this.currentReplicationState = currentReplicationState; + } + + @Nullable + public SegmentReplicationState getCurrentReplicationState() { + return currentReplicationState; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("node_id", nodeId); + builder.field("allocation_id", allocationId); builder.field("checkpoints_behind", checkpointsBehindCount); builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString()); builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis)); builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis)); + if (currentReplicationState != null) { + builder.startObject(); + currentReplicationState.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(nodeId); + out.writeString(allocationId); out.writeVLong(checkpointsBehindCount); out.writeVLong(bytesBehindCount); out.writeVLong(currentReplicationTimeMillis); @@ -96,17 +115,18 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { return "SegmentReplicationShardStats{" - + "nodeId='" - + nodeId - + '\'' + + "allocationId=" + + allocationId + ", checkpointsBehindCount=" + checkpointsBehindCount + ", bytesBehindCount=" + bytesBehindCount - + ", currentReplicationLag=" + + ", currentReplicationTimeMillis=" + currentReplicationTimeMillis - + ", lastCompletedLag=" + + ", lastCompletedReplicationTimeMillis=" + lastCompletedReplicationTimeMillis + + ", currentReplicationState=" + + currentReplicationState + '}'; } } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java index f4629b46d9e02..10975e48443d8 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -28,7 +28,7 @@ public class SegmentReplicationStats implements Writeable, ToXContentFragment { private final Map shardStats; - public SegmentReplicationStats(Map shardStats) { + public SegmentReplicationStats(final Map shardStats) { this.shardStats = shardStats; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 201bdc97e2466..d7176127615d5 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -26,7 +26,7 @@ public class SegmentReplicationStatsTracker { private final IndicesService indicesService; - private Map rejectionCount; + private final Map rejectionCount; public SegmentReplicationStatsTracker(IndicesService indicesService) { this.indicesService = indicesService; @@ -38,13 +38,7 @@ public SegmentReplicationStats getStats() { for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { - stats.putIfAbsent( - indexShard.shardId(), - new SegmentReplicationPerGroupStats( - indexShard.getReplicationStats(), - Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) - ) - ); + stats.putIfAbsent(indexShard.shardId(), getStatsForShard(indexShard)); } } } @@ -61,4 +55,12 @@ public void incrementRejectionCount(ShardId shardId) { } }); } + + public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { + return new SegmentReplicationPerGroupStats( + indexShard.shardId(), + indexShard.getReplicationStats(), + Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) + ); + } } diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 4715d1d4ddd5b..6edf008fc35b3 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -71,7 +71,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -1263,7 +1262,7 @@ private SegmentReplicationShardStats buildShardStats( ) { final Map checkpointTimers = checkpointState.checkpointTimers; return new SegmentReplicationShardStats( - Optional.ofNullable(this.routingTable.getByAllocationId(allocationId)).map(ShardRouting::currentNodeId).orElse("not assigned"), + allocationId, checkpointTimers.size(), checkpointState.visibleReplicationCheckpoint == null ? latestCheckpointLength diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java index 15070c723c2c8..08e7b89e68508 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationAction.java @@ -15,8 +15,11 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.Strings; import org.opensearch.common.Table; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentOpenSearchExtension; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestHandler; @@ -24,9 +27,10 @@ import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; -import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; @@ -72,7 +76,7 @@ public BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest reques return channel -> client.admin() .indices() - .segmentReplicationStats(segmentReplicationStatsRequest, new RestResponseListener(channel) { + .segmentReplicationStats(segmentReplicationStatsRequest, new RestResponseListener<>(channel) { @Override public RestResponse buildResponse(final SegmentReplicationStatsResponse response) throws Exception { return RestTable.buildResponse(buildSegmentReplicationTable(request, response), channel); @@ -90,22 +94,23 @@ protected Table getTableWithHeader(RestRequest request) { Table t = new Table(); t.startHeaders() - .addCell("index", "alias:i,idx;desc:index name") .addCell("shardId", "alias:s;desc: shard Id") - .addCell("time", "alias:t,ti;desc:segment replication time") - .addCell("stage", "alias:st;desc:segment replication stage") - .addCell("source_description", "alias:sdesc;desc:source description") - .addCell("target_host", "alias:thost;desc:target host") .addCell("target_node", "alias:tnode;desc:target node name") - .addCell("files_fetched", "alias:ff;desc:files fetched") - .addCell("files_percent", "alias:fp;desc:percent of files fetched") - .addCell("bytes_fetched", "alias:bf;desc:bytes fetched") - .addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched"); + .addCell("target_host", "alias:thost;desc:target host") + .addCell("checkpoints_behind", "alias:cpb;desc:checkpoints behind primary") + .addCell("bytes_behind", "alias:bb;desc:bytes behind primary") + .addCell("current_lag", "alias:clag;desc:ongoing time elapsed waiting for replica to catch up to primary") + .addCell("last_completed_lag", "alias:lcl;desc:time taken for replica to catch up to latest primary refresh") + .addCell("rejected_requests", "alias:rr;desc:count of rejected requests for the replication group"); if (detailed) { - t.addCell("start_time", "alias:start;desc:segment replication start time") - .addCell("start_time_millis", "alias:start_millis;desc:segment replication start time in epoch milliseconds") + t.addCell("stage", "alias:st;desc:segment replication event stage") + .addCell("time", "alias:t,ti;desc:current replication event time") + .addCell("files_fetched", "alias:ff;desc:files fetched") + .addCell("files_percent", "alias:fp;desc:percent of files fetched") + .addCell("bytes_fetched", "alias:bf;desc:bytes fetched") + .addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched") + .addCell("start_time", "alias:start;desc:segment replication start time") .addCell("stop_time", "alias:stop;desc:segment replication stop time") - .addCell("stop_time_millis", "alias:stop_millis;desc:segment replication stop time in epoch milliseconds") .addCell("files", "alias:f;desc:number of files to fetch") .addCell("files_total", "alias:tf;desc:total number of files") .addCell("bytes", "alias:b;desc:number of bytes to fetch") @@ -135,58 +140,61 @@ public Table buildSegmentReplicationTable(RestRequest request, SegmentReplicatio } Table t = getTableWithHeader(request); - for (String index : response.shardSegmentReplicationStates().keySet()) { + for (Map.Entry> entry : response.getReplicationStats().entrySet()) { + final List replicationPerGroupStats = entry.getValue(); - List shardSegmentReplicationStates = response.shardSegmentReplicationStates().get(index); - if (shardSegmentReplicationStates.size() == 0) { + if (replicationPerGroupStats.isEmpty()) { continue; } // Sort ascending by shard id for readability - CollectionUtil.introSort(shardSegmentReplicationStates, new Comparator() { - @Override - public int compare(SegmentReplicationState o1, SegmentReplicationState o2) { - int id1 = o1.getShardRouting().shardId().id(); - int id2 = o2.getShardRouting().shardId().id(); - if (id1 < id2) { - return -1; - } else if (id1 > id2) { - return 1; - } else { - return 0; - } - } + CollectionUtil.introSort(replicationPerGroupStats, (o1, o2) -> { + int id1 = o1.getShardId().id(); + int id2 = o2.getShardId().id(); + return Integer.compare(id1, id2); }); - for (SegmentReplicationState state : shardSegmentReplicationStates) { - t.startRow(); - t.addCell(index); - t.addCell(state.getShardRouting().shardId().id()); - t.addCell(new TimeValue(state.getTimer().time())); - t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); - t.addCell(state.getSourceDescription()); - t.addCell(state.getTargetNode().getHostName()); - t.addCell(state.getTargetNode().getName()); - t.addCell(state.getIndex().recoveredFileCount()); - t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredFilesPercent())); - t.addCell(state.getIndex().recoveredBytes()); - t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent())); - if (detailed) { - t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime())); - t.addCell(state.getTimer().startTime()); - t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime())); - t.addCell(state.getTimer().stopTime()); - t.addCell(state.getIndex().totalRecoverFiles()); - t.addCell(state.getIndex().totalFileCount()); - t.addCell(state.getIndex().totalRecoverBytes()); - t.addCell(state.getIndex().totalBytes()); - t.addCell(state.getReplicatingStageTime()); - t.addCell(state.getGetCheckpointInfoStageTime()); - t.addCell(state.getFileDiffStageTime()); - t.addCell(state.getGetFileStageTime()); - t.addCell(state.getFinalizeReplicationStageTime()); + for (SegmentReplicationPerGroupStats perGroupStats : replicationPerGroupStats) { + + final Set replicaShardStats = perGroupStats.getReplicaStats(); + + for (SegmentReplicationShardStats shardStats : replicaShardStats) { + final SegmentReplicationState state = shardStats.getCurrentReplicationState(); + if (state == null) { + continue; + } + + t.startRow(); + t.addCell(perGroupStats.getShardId()); + // these nulls should never happen, here for safety. + t.addCell(state.getTargetNode().getName()); + t.addCell(state.getTargetNode().getHostName()); + t.addCell(shardStats.getCheckpointsBehindCount()); + t.addCell(new ByteSizeValue(shardStats.getBytesBehindCount())); + t.addCell(new TimeValue(shardStats.getCurrentReplicationTimeMillis())); + t.addCell(new TimeValue(shardStats.getLastCompletedReplicationTimeMillis())); + t.addCell(perGroupStats.getRejectedRequestCount()); + if (detailed) { + t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); + t.addCell(new TimeValue(state.getTimer().time())); + t.addCell(state.getIndex().recoveredFileCount()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredFilesPercent())); + t.addCell(state.getIndex().recoveredBytes()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent())); + t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime())); + t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime())); + t.addCell(state.getIndex().totalRecoverFiles()); + t.addCell(state.getIndex().totalFileCount()); + t.addCell(new ByteSizeValue(state.getIndex().totalRecoverBytes())); + t.addCell(new ByteSizeValue(state.getIndex().totalBytes())); + t.addCell(state.getReplicatingStageTime()); + t.addCell(state.getGetCheckpointInfoStageTime()); + t.addCell(state.getFileDiffStageTime()); + t.addCell(state.getGetFileStageTime()); + t.addCell(state.getFinalizeReplicationStageTime()); + } + t.endRow(); } - t.endRow(); } } diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java index ec2b635d2d608..7a0d80d9538ad 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestCatSegmentReplicationActionTests.java @@ -11,43 +11,56 @@ import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.support.DefaultShardOperationFailedException; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Randomness; import org.opensearch.common.Table; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.Index; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.UUID; +import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RestCatSegmentReplicationActionTests extends OpenSearchTestCase { - public void testSegmentReplicationAction() { + public void testSegmentReplicationAction() throws IOException { final RestCatSegmentReplicationAction action = new RestCatSegmentReplicationAction(); final int totalShards = randomIntBetween(1, 32); final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2)); final int failedShards = totalShards - successfulShards; - final Map> shardSegmentReplicationStates = new HashMap<>(); - final List segmentReplicationStates = new ArrayList<>(); + final Map> shardSegmentReplicationStates = new HashMap<>(); + final List groupStats = new ArrayList<>(); + final long rejectedRequestCount = 5L; for (int i = 0; i < successfulShards; i++) { + final ShardId shardId = new ShardId(new Index("index", "_na_"), i); final SegmentReplicationState state = mock(SegmentReplicationState.class); final ShardRouting shardRouting = mock(ShardRouting.class); when(state.getShardRouting()).thenReturn(shardRouting); - when(shardRouting.shardId()).thenReturn(new ShardId(new Index("index", "_na_"), i)); + + when(shardRouting.shardId()).thenReturn(shardId); + final AllocationId aId = mock(AllocationId.class); + when(aId.getId()).thenReturn(UUID.randomUUID().toString()); + when(shardRouting.allocationId()).thenReturn(aId); when(state.getReplicationId()).thenReturn(randomLongBetween(0, 1000)); final ReplicationTimer timer = mock(ReplicationTimer.class); final long startTime = randomLongBetween(0, new Date().getTime()); @@ -60,19 +73,30 @@ public void testSegmentReplicationAction() { when(state.getSourceDescription()).thenReturn("Source"); final DiscoveryNode targetNode = mock(DiscoveryNode.class); when(targetNode.getHostName()).thenReturn(randomAlphaOfLength(8)); + when(targetNode.getName()).thenReturn(UUID.randomUUID().toString()); when(state.getTargetNode()).thenReturn(targetNode); ReplicationLuceneIndex index = createTestIndex(); when(state.getIndex()).thenReturn(index); - // - - segmentReplicationStates.add(state); + final SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats( + state.getShardRouting().allocationId().getId(), + 0L, + 0L, + 0L, + 0L + ); + segmentReplicationShardStats.setCurrentReplicationState(state); + final SegmentReplicationPerGroupStats perGroupStats = new SegmentReplicationPerGroupStats( + shardId, + Set.of(segmentReplicationShardStats), + rejectedRequestCount + ); + groupStats.add(perGroupStats); } - final List shuffle = new ArrayList<>(segmentReplicationStates); - Randomness.shuffle(shuffle); - shardSegmentReplicationStates.put("index", shuffle); + Randomness.shuffle(groupStats); + shardSegmentReplicationStates.put("index", groupStats); final List shardFailures = new ArrayList<>(); final SegmentReplicationStatsResponse response = new SegmentReplicationStatsResponse( @@ -88,18 +112,15 @@ public void testSegmentReplicationAction() { List headers = table.getHeaders(); - final List expectedHeaders = Arrays.asList( - "index", + final List expectedHeaders = asList( "shardId", - "time", - "stage", - "source_description", - "target_host", "target_node", - "files_fetched", - "files_percent", - "bytes_fetched", - "bytes_percent" + "target_host", + "checkpoints_behind", + "bytes_behind", + "current_lag", + "last_completed_lag", + "rejected_requests" ); for (int i = 0; i < expectedHeaders.size(); i++) { @@ -109,19 +130,20 @@ public void testSegmentReplicationAction() { assertThat(table.getRows().size(), equalTo(successfulShards)); for (int i = 0; i < successfulShards; i++) { - final SegmentReplicationState state = segmentReplicationStates.get(i); - final List expectedValues = Arrays.asList( - "index", - i, - new TimeValue(state.getTimer().time()), - state.getStage().name().toLowerCase(Locale.ROOT), - state.getSourceDescription(), - state.getTargetNode().getHostName(), - state.getTargetNode().getName(), - state.getIndex().recoveredFileCount(), - percent(state.getIndex().recoveredFilesPercent()), - state.getIndex().recoveredBytes(), - percent(state.getIndex().recoveredBytesPercent()) + final SegmentReplicationPerGroupStats perGroupStats = groupStats.get(i); + final Set replicaStats = perGroupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + final SegmentReplicationShardStats shardStats = replicaStats.stream().findFirst().get(); + final SegmentReplicationState currentReplicationState = shardStats.getCurrentReplicationState(); + final List expectedValues = asList( + perGroupStats.getShardId(), + currentReplicationState.getTargetNode().getName(), + currentReplicationState.getTargetNode().getHostName(), + shardStats.getCheckpointsBehindCount(), + new ByteSizeValue(shardStats.getBytesBehindCount()), + new TimeValue(shardStats.getCurrentReplicationTimeMillis()), + new TimeValue(shardStats.getLastCompletedReplicationTimeMillis()), + rejectedRequestCount ); final List cells = table.getRows().get(i);