diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java index be89154358251..3b56c07cb10c8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/ClientTimeoutIT.java @@ -17,15 +17,10 @@ import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.opensearch.action.admin.indices.recovery.RecoveryAction; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsAction; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -51,11 +46,6 @@ protected Collection> nodePlugins() { return Collections.singletonList(MockTransportService.TestPlugin.class); } - @Override - protected boolean addMockInternalEngine() { - return false; - } - public void testNodesInfoTimeout() { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String dataNode = internalCluster().startDataOnlyNode(); @@ -157,55 +147,6 @@ public void testRecoveriesWithTimeout() { assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); } - public void testSegmentReplicationStatsWithTimeout() { - internalCluster().startClusterManagerOnlyNode( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() - ); - String dataNode = internalCluster().startDataOnlyNode( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() - ); - String anotherDataNode = internalCluster().startDataOnlyNode( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build() - ); - - int numShards = 4; - assertAcked( - prepareCreate( - "test-index", - 0, - Settings.builder() - .put("number_of_shards", numShards) - .put("number_of_replicas", 1) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - ) - ); - ensureGreen(); - final long numDocs = scaledRandomIntBetween(50, 100); - for (int i = 0; i < numDocs; i++) { - index("test-index", "doc", Integer.toString(i)); - } - refresh("test-index"); - ensureSearchable("test-index"); - - // Happy case - SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() - .indices() - .prepareSegmentReplicationStats() - .get(); - assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2)); - assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2)); - - // simulate timeout on bad node. - simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME); - - // verify response with bad node. - segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get(); - assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2)); - assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards)); - assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards)); - assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException")); - } - public void testStatsWithTimeout() { internalCluster().startClusterManagerOnlyNode(); String dataNode = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index d32a9df6d0d77..b216824f420cc 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -140,7 +140,7 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest singleIndexWithSegmentReplicationDisabled = shardRouting.getIndexName(); return null; } - if (indexShard.indexSettings().isSegRepEnabled() == false) { + if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) { return null; } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 9d978dcd46ae9..50649cabf4f15 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -22,8 +22,8 @@ import org.opensearch.indices.replication.common.ReplicationTimer; import java.io.IOException; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * ReplicationState implementation to track Segment Replication events. @@ -76,10 +76,9 @@ public static Stage fromId(byte id) { private Stage stage; private ReplicationLuceneIndex index; - private final ReplicationTimer overallTimer; - // Timing data will have as many entries as stages, plus one - private final Map timingData = new ConcurrentHashMap<>(Stage.values().length + 1); + private final ReplicationTimer overallTimer; + private final Map timingData; private final ReplicationTimer stageTimer; private long replicationId; private final ShardRouting shardRouting; @@ -153,6 +152,8 @@ public SegmentReplicationState( this.replicationId = replicationId; this.sourceDescription = sourceDescription; this.targetNode = targetNode; + // Timing data will have as many entries as stages, plus one + timingData = new HashMap<>(Stage.values().length + 1); overallTimer = new ReplicationTimer(); stageTimer = new ReplicationTimer(); setStage(Stage.INIT); @@ -166,6 +167,7 @@ public SegmentReplicationState(StreamInput in) throws IOException { replicationId = in.readLong(); overallTimer = new ReplicationTimer(in); stageTimer = new ReplicationTimer(in); + timingData = in.readMap(StreamInput::readString, StreamInput::readLong); sourceDescription = in.readString(); targetNode = new DiscoveryNode(in); } @@ -178,6 +180,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(replicationId); overallTimer.writeTo(out); stageTimer.writeTo(out); + + // Copy of timingData is created to avoid concurrent modification of timingData map. + Map timingDataCopy = new HashMap<>(); + for (Map.Entry entry : timingDataCopy.entrySet()) { + timingDataCopy.put(entry.getKey(), entry.getValue()); + } + out.writeMap(timingDataCopy, StreamOutput::writeString, StreamOutput::writeLong); out.writeString(sourceDescription); targetNode.writeTo(out); } @@ -270,11 +279,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.startObject(SegmentReplicationState.Fields.INDEX); index.toXContent(builder, params); builder.endObject(); - builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime()); - builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime()); - builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime()); - builder.field(Fields.GET_FILES_STAGE, getGetFileStageTime()); - builder.field(Fields.FINALIZE_REPLICATION_STAGE, getFinalizeReplicationStageTime()); + if (timingData.isEmpty() == false) { + builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime()); + builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime()); + builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime()); + builder.field(Fields.GET_FILES_STAGE, getGetFileStageTime()); + builder.field(Fields.FINALIZE_REPLICATION_STAGE, getFinalizeReplicationStageTime()); + } return builder; }