diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardMonitoringDoc.java index f034dd0fc3a7e..37eab685521a7 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardMonitoringDoc.java @@ -32,10 +32,20 @@ public class ShardMonitoringDoc extends FilteredMonitoringDoc { final long interval, final MonitoringDoc.Node node, final ShardRouting shardRouting, - final String clusterStateUUID + final String clusterStateUUID, + final int shardIndex ) { - super(cluster, timestamp, interval, node, MonitoredSystem.ES, TYPE, id(clusterStateUUID, shardRouting), XCONTENT_FILTERS); + super( + cluster, + timestamp, + interval, + node, + MonitoredSystem.ES, + TYPE, + id(clusterStateUUID, shardRouting, shardIndex), + XCONTENT_FILTERS + ); this.shardRouting = Objects.requireNonNull(shardRouting); this.clusterStateUUID = Objects.requireNonNull(clusterStateUUID); } @@ -61,9 +71,9 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO /** * Compute an id that has the format: * - * {state_uuid}:{node_id || '_na'}:{index}:{shard}:{'p' || 'r'} + * {state_uuid}:{node_id || '_na'}:{index}:s{shard}:{'p' || 'rX'} */ - public static String id(String stateUUID, ShardRouting shardRouting) { + public static String id(String stateUUID, ShardRouting shardRouting, int shardIndex) { StringBuilder builder = new StringBuilder(); builder.append(stateUUID); builder.append(':'); @@ -74,13 +84,14 @@ public static String id(String stateUUID, ShardRouting shardRouting) { } builder.append(':'); builder.append(shardRouting.getIndexName()); - builder.append(':'); + builder.append(":s"); builder.append(Integer.valueOf(shardRouting.id())); builder.append(':'); if (shardRouting.primary()) { builder.append("p"); } else { builder.append("r"); + builder.append(Integer.valueOf(shardIndex)); } return builder.toString(); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java index dde8fb7353a5d..276935e599fd2 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java @@ -8,6 +8,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -20,7 +22,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Collector for shards. @@ -46,23 +50,46 @@ protected Collection doCollect(final MonitoringDoc.Node node, fin if (clusterState != null) { RoutingTable routingTable = clusterState.routingTable(); if (routingTable != null) { - List shards = routingTable.allShards(); - if (shards != null) { - final String clusterUuid = clusterUuid(clusterState); - final String stateUUID = clusterState.stateUUID(); - final long timestamp = timestamp(); + final String clusterUuid = clusterUuid(clusterState); + final String stateUUID = clusterState.stateUUID(); + final long timestamp = timestamp(); - final String[] indices = getCollectionIndices(); - final boolean isAllIndices = IndexNameExpressionResolver.isAllIndices(Arrays.asList(indices)); + final String[] indicesToMonitor = getCollectionIndices(); + final boolean isAllIndices = IndexNameExpressionResolver.isAllIndices(Arrays.asList(indicesToMonitor)); + final String[] indices = isAllIndices + ? routingTable.indicesRouting().keySet().toArray(new String[0]) + : expandIndexPattern(indicesToMonitor, routingTable.indicesRouting().keySet().toArray(new String[0])); - for (ShardRouting shard : shards) { - if (isAllIndices || Regex.simpleMatch(indices, shard.getIndexName())) { - MonitoringDoc.Node shardNode = null; - if (shard.assignedToNode()) { + for (String index : indices) { + IndexRoutingTable indexRoutingTable = routingTable.index(index); + if (indexRoutingTable != null) { + final int shardCount = indexRoutingTable.size(); + for (int i = 0; i < shardCount; i++) { + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(i); + + ShardRouting primary = shardRoutingTable.primaryShard(); + MonitoringDoc.Node primaryShardNode = null; + if (primary.assignedToNode()) { // If the shard is assigned to a node, the shard monitoring document refers to this node - shardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(shard.currentNodeId())); + primaryShardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(primary.currentNodeId())); + } + results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, primaryShardNode, primary, stateUUID, 0)); + + List replicas = shardRoutingTable.replicaShards(); + for (int j = 0; j < replicas.size(); j++) { + ShardRouting replica = replicas.get(j); + + MonitoringDoc.Node replicaShardNode = null; + if (replica.assignedToNode()) { + replicaShardNode = convertNode( + node.getTimestamp(), + clusterState.getNodes().get(replica.currentNodeId()) + ); + } + results.add( + new ShardMonitoringDoc(clusterUuid, timestamp, interval, replicaShardNode, replica, stateUUID, j + 1) + ); } - results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, shardNode, shard, stateUUID)); } } } @@ -70,4 +97,22 @@ protected Collection doCollect(final MonitoringDoc.Node node, fin } return Collections.unmodifiableCollection(results); } + + private String[] expandIndexPattern(String[] indicesToMonitor, String[] indices) { + final Set expandedIndices = new HashSet<>(); + + for (String indexOrPattern : indicesToMonitor) { + if (indexOrPattern.contains("*")) { + for (String index : indices) { + if (Regex.simpleMatch(indexOrPattern, index)) { + expandedIndices.add(index); + } + } + } else { + expandedIndices.add(indexOrPattern); + } + } + + return expandedIndices.toArray(new String[0]); + } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java index 08949d77c57a0..8bfec539f1282 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollectorTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.monitoring.collector.shards; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -20,7 +22,10 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.UUID; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; @@ -30,6 +35,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.eq; @@ -77,7 +83,12 @@ public void testDoCollect() throws Exception { final String stateUUID = UUID.randomUUID().toString(); when(clusterState.stateUUID()).thenReturn(stateUUID); - final String[] indices = randomFrom(NONE, Strings.EMPTY_ARRAY, new String[] { "_all" }, new String[] { "_index*" }); + final String[] indices = randomFrom( + NONE, + Strings.EMPTY_ARRAY, + new String[] { "_all" }, + new String[] { "_index*", "_does-not-exist" } + ); withCollectionIndices(indices); final RoutingTable routingTable = mockRoutingTable(); @@ -111,7 +122,11 @@ public void testDoCollect() throws Exception { assertThat(document.getIntervalMillis(), equalTo(interval)); assertThat(document.getSystem(), is(MonitoredSystem.ES)); assertThat(document.getType(), equalTo(ShardMonitoringDoc.TYPE)); - assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUUID, document.getShardRouting()))); + if (document.getShardRouting().primary()) { + assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUUID, document.getShardRouting(), 0))); + } else { + assertThat(document.getId(), matchesPattern("^[\\w-]+:(_current|_na)+:_index:s\\d:r[1-9]+[0-9]*")); + } assertThat(document.getClusterStateUUID(), equalTo(stateUUID)); if (document.getShardRouting().assignedToNode()) { @@ -130,15 +145,60 @@ public void testDoCollect() throws Exception { private static RoutingTable mockRoutingTable() { final List allShards = new ArrayList<>(); - final int nbShards = randomIntBetween(0, 10); - for (int i = 0; i < nbShards; i++) { + final int numberOfPrimaryShards = randomIntBetween(0, 10); + for (int i = 0; i < numberOfPrimaryShards; i++) { ShardRoutingState state = randomFrom(STARTED, UNASSIGNED); ShardId shardId = new ShardId("_index", randomAlphaOfLength(12), i); - allShards.add(TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, true, state)); + ShardRouting primary = TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, true, state); + allShards.add(primary); + } + + final int numberOfReplicaShards = randomIntBetween(0, 3); + for (int i = 0; i < numberOfPrimaryShards; i++) { + for (int j = 0; j < numberOfReplicaShards; j++) { + ShardRoutingState state = randomFrom(STARTED, UNASSIGNED); + ShardId shardId = new ShardId("_index", randomAlphaOfLength(12), i); + ShardRouting replica = TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, false, state); + allShards.add(replica); + } } final RoutingTable routingTable = mock(RoutingTable.class); + // _index* matches the test data above + when(routingTable.allShards("_index*")).thenReturn(allShards); + // _all is reserved to mean all indices in the routing table + when(routingTable.allShards("_all")).thenReturn(allShards); + // When collection indices is set to [], it's treated the same as "_all", so the key set of the routing table is used to grab the + // index names + when(routingTable.allShards("_index")).thenReturn(allShards); + // This mock routing table only has the index named "_index", so if collection indices is set to ["_none"] no shards should be + // found. + when(routingTable.allShards("_none")).thenReturn(new ArrayList<>(0)); + + final IndexRoutingTable indexRoutingTable = mock(IndexRoutingTable.class); + final Map indicesRouting = Map.of("_index", indexRoutingTable); + when(routingTable.indicesRouting()).thenReturn(indicesRouting); + when(routingTable.index("_index")).thenReturn(indexRoutingTable); + + when(indexRoutingTable.size()).thenReturn(numberOfPrimaryShards); + for (int i = 0; i < numberOfPrimaryShards; i++) { + final IndexShardRoutingTable shardRoutingTable = mock(IndexShardRoutingTable.class); + when(indexRoutingTable.shard(i)).thenReturn(shardRoutingTable); + when(shardRoutingTable.primaryShard()).thenReturn(allShards.get(i)); + List replicas = new ArrayList<>(); + int replicaIndexStart = numberOfPrimaryShards + i * numberOfReplicaShards; + int replicaIndexEnd = replicaIndexStart + numberOfReplicaShards; + for (int j = replicaIndexStart; j < replicaIndexEnd; j++) { + replicas.add(allShards.get(j)); + } + when(shardRoutingTable.replicaShards()).thenReturn(replicas); + } + + // This is only used by the test to decide how many shards should be covered when(routingTable.allShards()).thenReturn(allShards); + + Collections.shuffle(allShards, new Random(numberOfPrimaryShards)); + return routingTable; } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsMonitoringDocTests.java index b23130f7b6a38..b001ad42ad3c4 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsMonitoringDocTests.java @@ -58,14 +58,14 @@ protected ShardMonitoringDoc createMonitoringDoc( String type, String id ) { - return new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, stateUuid); + return new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, stateUuid, shardRouting.primary() ? 0 : 1); } @Override protected void assertFilteredMonitoringDoc(final ShardMonitoringDoc document) { assertThat(document.getSystem(), is(MonitoredSystem.ES)); assertThat(document.getType(), is(ShardMonitoringDoc.TYPE)); - assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUuid, shardRouting))); + assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUuid, shardRouting, shardRouting.primary() ? 0 : 1))); assertThat(document.getShardRouting(), is(shardRouting)); if (assignedToNode) { @@ -82,31 +82,31 @@ protected Set getExpectedXContentFilters() { } public void testConstructorShardRoutingMustNotBeNull() { - expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, null, stateUuid)); + expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, null, stateUuid, 0)); } public void testConstructorStateUuidMustNotBeNull() { - expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, null)); + expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, null, 0)); } public void testIdWithPrimaryShardAssigned() { shardRouting = newShardRouting("_index_0", 123, "_node_0", randomAlphaOfLength(5), true, INITIALIZING); - assertEquals("_state_uuid_0:_node_0:_index_0:123:p", ShardMonitoringDoc.id("_state_uuid_0", shardRouting)); + assertEquals("_state_uuid_0:_node_0:_index_0:s123:p", ShardMonitoringDoc.id("_state_uuid_0", shardRouting, 0)); } public void testIdWithReplicaShardAssigned() { shardRouting = newShardRouting("_index_1", 456, "_node_1", randomAlphaOfLength(5), false, INITIALIZING); - assertEquals("_state_uuid_1:_node_1:_index_1:456:r", ShardMonitoringDoc.id("_state_uuid_1", shardRouting)); + assertEquals("_state_uuid_1:_node_1:_index_1:s456:r1", ShardMonitoringDoc.id("_state_uuid_1", shardRouting, 1)); } public void testIdWithPrimaryShardUnassigned() { shardRouting = newShardRouting("_index_2", 789, null, true, UNASSIGNED); - assertEquals("_state_uuid_2:_na:_index_2:789:p", ShardMonitoringDoc.id("_state_uuid_2", shardRouting)); + assertEquals("_state_uuid_2:_na:_index_2:s789:p", ShardMonitoringDoc.id("_state_uuid_2", shardRouting, 0)); } public void testIdWithReplicaShardUnassigned() { shardRouting = newShardRouting("_index_3", 159, null, false, UNASSIGNED); - assertEquals("_state_uuid_3:_na:_index_3:159:r", ShardMonitoringDoc.id("_state_uuid_3", shardRouting)); + assertEquals("_state_uuid_3:_na:_index_3:s159:r1", ShardMonitoringDoc.id("_state_uuid_3", shardRouting, 1)); } @Override @@ -119,7 +119,8 @@ public void testToXContent() throws IOException { 1506593717631L, node, shardRouting, - "_state_uuid" + "_state_uuid", + 0 ); final BytesReference xContent = XContentHelper.toXContent(doc, XContentType.JSON, randomBoolean());