Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitoring] Make unassigned replica shard documents unique #91153

Merged
merged 11 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,20 @@ public class ShardMonitoringDoc extends FilteredMonitoringDoc {
final long interval,
final MonitoringDoc.Node node,
final ShardRouting shardRouting,
final String clusterStateUUID
final String clusterStateUUID,
final int shardIndex
) {

super(cluster, timestamp, interval, node, MonitoredSystem.ES, TYPE, id(clusterStateUUID, shardRouting), XCONTENT_FILTERS);
super(
cluster,
timestamp,
interval,
node,
MonitoredSystem.ES,
TYPE,
id(clusterStateUUID, shardRouting, shardIndex),
XCONTENT_FILTERS
);
this.shardRouting = Objects.requireNonNull(shardRouting);
this.clusterStateUUID = Objects.requireNonNull(clusterStateUUID);
}
Expand All @@ -61,9 +71,9 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO
/**
* Compute an id that has the format:
*
* {state_uuid}:{node_id || '_na'}:{index}:{shard}:{'p' || 'r'}
* {state_uuid}:{node_id || '_na'}:{index}:s{shard}:{'p' || 'rX'}
*/
public static String id(String stateUUID, ShardRouting shardRouting) {
public static String id(String stateUUID, ShardRouting shardRouting, int shardIndex) {
StringBuilder builder = new StringBuilder();
builder.append(stateUUID);
builder.append(':');
Expand All @@ -74,13 +84,14 @@ public static String id(String stateUUID, ShardRouting shardRouting) {
}
builder.append(':');
builder.append(shardRouting.getIndexName());
builder.append(':');
builder.append(":s");
builder.append(Integer.valueOf(shardRouting.id()));
builder.append(':');
if (shardRouting.primary()) {
builder.append("p");
} else {
builder.append("r");
builder.append(Integer.valueOf(shardIndex));
}
return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -20,7 +22,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Collector for shards.
Expand All @@ -46,28 +50,69 @@ protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, fin
if (clusterState != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has a lot of these nested levels which we could get rid of by flipping the guard statements, but I'm not sure what the idiomatic style is here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is pretty deeply nested, but I don't think it's so bad that it needs a refactoring right now. A lot of this code is deprecated and on a course to being a distant memory when the internal monitoring feature is removed.

RoutingTable routingTable = clusterState.routingTable();
if (routingTable != null) {
List<ShardRouting> shards = routingTable.allShards();
if (shards != null) {
final String clusterUuid = clusterUuid(clusterState);
final String stateUUID = clusterState.stateUUID();
final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
final String stateUUID = clusterState.stateUUID();
final long timestamp = timestamp();

final String[] indices = getCollectionIndices();
final boolean isAllIndices = IndexNameExpressionResolver.isAllIndices(Arrays.asList(indices));
final String[] indicesToMonitor = getCollectionIndices();
final boolean isAllIndices = IndexNameExpressionResolver.isAllIndices(Arrays.asList(indicesToMonitor));
final String[] indices = isAllIndices
? routingTable.indicesRouting().keySet().toArray(new String[0])
: expandIndexPattern(indicesToMonitor, routingTable.indicesRouting().keySet().toArray(new String[0]));

for (ShardRouting shard : shards) {
if (isAllIndices || Regex.simpleMatch(indices, shard.getIndexName())) {
MonitoringDoc.Node shardNode = null;
if (shard.assignedToNode()) {
for (String index : indices) {
IndexRoutingTable indexRoutingTable = routingTable.index(index);
if (indexRoutingTable != null) {
final int shardCount = indexRoutingTable.size();
for (int i = 0; i < shardCount; i++) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(i);

ShardRouting primary = shardRoutingTable.primaryShard();
MonitoringDoc.Node primaryShardNode = null;
if (primary.assignedToNode()) {
// If the shard is assigned to a node, the shard monitoring document refers to this node
shardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(shard.currentNodeId()));
primaryShardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(primary.currentNodeId()));
}
results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, primaryShardNode, primary, stateUUID, 0));

List<ShardRouting> replicas = shardRoutingTable.replicaShards();
for (int j = 0; j < replicas.size(); j++) {
ShardRouting replica = replicas.get(j);

MonitoringDoc.Node replicaShardNode = null;
if (replica.assignedToNode()) {
replicaShardNode = convertNode(
node.getTimestamp(),
clusterState.getNodes().get(replica.currentNodeId())
);
}
results.add(
new ShardMonitoringDoc(clusterUuid, timestamp, interval, replicaShardNode, replica, stateUUID, j + 1)
);
}
results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, shardNode, shard, stateUUID));
}
}
}
}
}
return Collections.unmodifiableCollection(results);
}

private String[] expandIndexPattern(String[] indicesToMonitor, String[] indices) {
final Set<String> expandedIndices = new HashSet<>();

for (String indexOrPattern : indicesToMonitor) {
if (indexOrPattern.contains("*")) {
for (String index : indices) {
if (Regex.simpleMatch(indexOrPattern, index)) {
jbaiera marked this conversation as resolved.
Show resolved Hide resolved
expandedIndices.add(index);
}
}
} else {
expandedIndices.add(indexOrPattern);
}
}

return expandedIndices.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.elasticsearch.xpack.monitoring.collector.shards;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand All @@ -20,7 +22,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
Expand All @@ -30,6 +35,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -77,7 +83,12 @@ public void testDoCollect() throws Exception {
final String stateUUID = UUID.randomUUID().toString();
when(clusterState.stateUUID()).thenReturn(stateUUID);

final String[] indices = randomFrom(NONE, Strings.EMPTY_ARRAY, new String[] { "_all" }, new String[] { "_index*" });
final String[] indices = randomFrom(
NONE,
Strings.EMPTY_ARRAY,
new String[] { "_all" },
new String[] { "_index*", "_does-not-exist" }
);
withCollectionIndices(indices);

final RoutingTable routingTable = mockRoutingTable();
Expand Down Expand Up @@ -111,7 +122,11 @@ public void testDoCollect() throws Exception {
assertThat(document.getIntervalMillis(), equalTo(interval));
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), equalTo(ShardMonitoringDoc.TYPE));
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUUID, document.getShardRouting())));
if (document.getShardRouting().primary()) {
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUUID, document.getShardRouting(), 0)));
} else {
assertThat(document.getId(), matchesPattern("^[\\w-]+:(_current|_na)+:_index:s\\d:r[1-9]+[0-9]*"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, I don't know which index is being used for which monitoring document, so I'm falling back to just checking the format and that at least there isn't any replica r0 (since the 0 would be the primary).

}
assertThat(document.getClusterStateUUID(), equalTo(stateUUID));

if (document.getShardRouting().assignedToNode()) {
Expand All @@ -130,15 +145,60 @@ public void testDoCollect() throws Exception {
private static RoutingTable mockRoutingTable() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test setup here is starting to feel a bit messy, not sure what can be done about it.

final List<ShardRouting> allShards = new ArrayList<>();

final int nbShards = randomIntBetween(0, 10);
for (int i = 0; i < nbShards; i++) {
final int numberOfPrimaryShards = randomIntBetween(0, 10);
for (int i = 0; i < numberOfPrimaryShards; i++) {
ShardRoutingState state = randomFrom(STARTED, UNASSIGNED);
ShardId shardId = new ShardId("_index", randomAlphaOfLength(12), i);
allShards.add(TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, true, state));
ShardRouting primary = TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, true, state);
allShards.add(primary);
}

final int numberOfReplicaShards = randomIntBetween(0, 3);
for (int i = 0; i < numberOfPrimaryShards; i++) {
for (int j = 0; j < numberOfReplicaShards; j++) {
ShardRoutingState state = randomFrom(STARTED, UNASSIGNED);
ShardId shardId = new ShardId("_index", randomAlphaOfLength(12), i);
ShardRouting replica = TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, false, state);
allShards.add(replica);
}
}

final RoutingTable routingTable = mock(RoutingTable.class);
// _index* matches the test data above
when(routingTable.allShards("_index*")).thenReturn(allShards);
// _all is reserved to mean all indices in the routing table
when(routingTable.allShards("_all")).thenReturn(allShards);
// When collection indices is set to [], it's treated the same as "_all", so the key set of the routing table is used to grab the
// index names
when(routingTable.allShards("_index")).thenReturn(allShards);
miltonhultgren marked this conversation as resolved.
Show resolved Hide resolved
// This mock routing table only has the index named "_index", so if collection indices is set to ["_none"] no shards should be
// found.
when(routingTable.allShards("_none")).thenReturn(new ArrayList<>(0));

final IndexRoutingTable indexRoutingTable = mock(IndexRoutingTable.class);
final Map<String, IndexRoutingTable> indicesRouting = Map.of("_index", indexRoutingTable);
when(routingTable.indicesRouting()).thenReturn(indicesRouting);
when(routingTable.index("_index")).thenReturn(indexRoutingTable);

when(indexRoutingTable.size()).thenReturn(numberOfPrimaryShards);
for (int i = 0; i < numberOfPrimaryShards; i++) {
final IndexShardRoutingTable shardRoutingTable = mock(IndexShardRoutingTable.class);
when(indexRoutingTable.shard(i)).thenReturn(shardRoutingTable);
when(shardRoutingTable.primaryShard()).thenReturn(allShards.get(i));
List<ShardRouting> replicas = new ArrayList<>();
int replicaIndexStart = numberOfPrimaryShards + i * numberOfReplicaShards;
int replicaIndexEnd = replicaIndexStart + numberOfReplicaShards;
for (int j = replicaIndexStart; j < replicaIndexEnd; j++) {
replicas.add(allShards.get(j));
}
when(shardRoutingTable.replicaShards()).thenReturn(replicas);
}

// This is only used by the test to decide how many shards should be covered
when(routingTable.allShards()).thenReturn(allShards);

Collections.shuffle(allShards, new Random(numberOfPrimaryShards));

return routingTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ protected ShardMonitoringDoc createMonitoringDoc(
String type,
String id
) {
return new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, stateUuid);
return new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, stateUuid, shardRouting.primary() ? 0 : 1);
}

@Override
protected void assertFilteredMonitoringDoc(final ShardMonitoringDoc document) {
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(ShardMonitoringDoc.TYPE));
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUuid, shardRouting)));
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUuid, shardRouting, shardRouting.primary() ? 0 : 1)));

assertThat(document.getShardRouting(), is(shardRouting));
if (assignedToNode) {
Expand All @@ -82,31 +82,31 @@ protected Set<String> getExpectedXContentFilters() {
}

public void testConstructorShardRoutingMustNotBeNull() {
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, null, stateUuid));
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, null, stateUuid, 0));
}

public void testConstructorStateUuidMustNotBeNull() {
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, null));
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, null, 0));
}

public void testIdWithPrimaryShardAssigned() {
shardRouting = newShardRouting("_index_0", 123, "_node_0", randomAlphaOfLength(5), true, INITIALIZING);
assertEquals("_state_uuid_0:_node_0:_index_0:123:p", ShardMonitoringDoc.id("_state_uuid_0", shardRouting));
assertEquals("_state_uuid_0:_node_0:_index_0:s123:p", ShardMonitoringDoc.id("_state_uuid_0", shardRouting, 0));
}

public void testIdWithReplicaShardAssigned() {
shardRouting = newShardRouting("_index_1", 456, "_node_1", randomAlphaOfLength(5), false, INITIALIZING);
assertEquals("_state_uuid_1:_node_1:_index_1:456:r", ShardMonitoringDoc.id("_state_uuid_1", shardRouting));
assertEquals("_state_uuid_1:_node_1:_index_1:s456:r1", ShardMonitoringDoc.id("_state_uuid_1", shardRouting, 1));
}

public void testIdWithPrimaryShardUnassigned() {
shardRouting = newShardRouting("_index_2", 789, null, true, UNASSIGNED);
assertEquals("_state_uuid_2:_na:_index_2:789:p", ShardMonitoringDoc.id("_state_uuid_2", shardRouting));
assertEquals("_state_uuid_2:_na:_index_2:s789:p", ShardMonitoringDoc.id("_state_uuid_2", shardRouting, 0));
}

public void testIdWithReplicaShardUnassigned() {
shardRouting = newShardRouting("_index_3", 159, null, false, UNASSIGNED);
assertEquals("_state_uuid_3:_na:_index_3:159:r", ShardMonitoringDoc.id("_state_uuid_3", shardRouting));
assertEquals("_state_uuid_3:_na:_index_3:s159:r1", ShardMonitoringDoc.id("_state_uuid_3", shardRouting, 1));
}

@Override
Expand All @@ -119,7 +119,8 @@ public void testToXContent() throws IOException {
1506593717631L,
node,
shardRouting,
"_state_uuid"
"_state_uuid",
0
);

final BytesReference xContent = XContentHelper.toXContent(doc, XContentType.JSON, randomBoolean());
Expand Down