Skip to content

Commit

Permalink
Remove RoutingTable#shardsWithState(...) (elastic#78959) (elastic#78967)
Browse files Browse the repository at this point in the history
Replaced all usages of this method with: `RoutingNodes#shardsWithState(...)`
  • Loading branch information
martijnvg authored Oct 12, 2021
1 parent e0c3f77 commit e7d87dc
Show file tree
Hide file tree
Showing 27 changed files with 135 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void testTransientSettingsStillApplied() {

ClusterState state = client().admin().cluster().prepareState().get().getState();

for (ShardRouting shard : state.getRoutingTable().shardsWithState(ShardRoutingState.STARTED)) {
for (ShardRouting shard : state.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
String node = state.getRoutingNodes().node(shard.currentNodeId()).node().getName();
logger.info("--> shard on {} - {}", node, shard);
assertTrue("shard on " + node + " but should only be on the include node list: " +
Expand All @@ -258,7 +258,7 @@ public void testTransientSettingsStillApplied() {
// The transient settings still exist in the state
assertThat(state.metadata().transientSettings(), equalTo(exclude));

for (ShardRouting shard : state.getRoutingTable().shardsWithState(ShardRoutingState.STARTED)) {
for (ShardRouting shard : state.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
String node = state.getRoutingNodes().node(shard.currentNodeId()).node().getName();
logger.info("--> shard on {} - {}", node, shard);
assertTrue("shard on " + node + " but should only be on the include node list: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void indexRandomData() throws Exception {

private String findNodeWithShard() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
List<ShardRouting> startedShards = state.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED);
Collections.shuffle(startedShards,random());
return state.nodes().get(startedShards.get(0).currentNodeId()).getName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {

// make sure nodeA has primary and nodeB has replica
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
List<ShardRouting> startedShards = state.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED);
assertThat(startedShards.size(), equalTo(2));
for (ShardRouting shardRouting : startedShards) {
if (shardRouting.primary()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private void buildRedIndex(int numShards) throws Exception {

assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> unassigneds = clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
List<ShardRouting> unassigneds = clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED);
assertThat(unassigneds.size(), greaterThan(0));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ public void testDataFileFailureDuringRestore() throws Exception {
assertEquals(1, clusterStateResponse.getState().getNodes().getDataNodes().size());
assertEquals(
restoreInfo.failedShards(),
clusterStateResponse.getState().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size()
clusterStateResponse.getState().getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ public boolean validate(Metadata metadata) {
return true;
}

public List<ShardRouting> shardsWithState(ShardRoutingState state) {
List<ShardRouting> shards = new ArrayList<>();
for (IndexRoutingTable indexRoutingTable : this) {
shards.addAll(indexRoutingTable.shardsWithState(state));
}
return shards;
}

/**
* All the shards (replicas) for all indices in this routing table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testNoDelayedUnassigned() throws Exception {
clusterState = ClusterState.builder(clusterState).nodes(nodes).build();
clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute");
ClusterState newState = clusterState;
List<ShardRouting> unassignedShards = newState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
List<ShardRouting> unassignedShards = newState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED);
if (nodeAvailableForAllocation) {
assertThat(unassignedShards.size(), equalTo(0));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,31 @@ public void testIndicesRouting() {
}

public void testShardsWithState() {
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size(), is(this.totalNumberOfShards));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), is(this.totalNumberOfShards));

initPrimaries();
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size(),
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(),
is(this.totalNumberOfShards - 2 * this.numberOfShards));
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.INITIALIZING).size(), is(2 * this.numberOfShards));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), is(2 * this.numberOfShards));

startInitializingShards(TEST_INDEX_1);
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.STARTED).size(), is(this.numberOfShards));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), is(this.numberOfShards));
int initializingExpected = this.numberOfShards + this.numberOfShards * this.numberOfReplicas;
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.INITIALIZING).size(), is(initializingExpected));
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size(),
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), is(initializingExpected));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(),
is(this.totalNumberOfShards - initializingExpected - this.numberOfShards));

startInitializingShards(TEST_INDEX_2);
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.STARTED).size(), is(2 * this.numberOfShards));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), is(2 * this.numberOfShards));
initializingExpected = 2 * this.numberOfShards * this.numberOfReplicas;
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.INITIALIZING).size(), is(initializingExpected));
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size(),
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), is(initializingExpected));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(),
is(this.totalNumberOfShards - initializingExpected - 2 * this.numberOfShards));

// now start all replicas too
startInitializingShards(TEST_INDEX_1);
startInitializingShards(TEST_INDEX_2);
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.STARTED).size(), is(this.totalNumberOfShards));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), is(this.totalNumberOfShards));
}

public void testActivePrimaryShardsGrouped() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -179,32 +180,35 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing

final ClusterState reroutedState1 = rerouteAndStartShards(allocationService, clusterState);
final RoutingTable routingTable1 = reroutedState1.routingTable();
final RoutingNodes routingNodes1 = reroutedState1.getRoutingNodes();
// the test harness only permits one recovery per node, so we must have allocated all the high-priority primaries and one of the
// medium-priority ones
assertThat(routingTable1.shardsWithState(ShardRoutingState.INITIALIZING), empty());
assertThat(routingTable1.shardsWithState(ShardRoutingState.RELOCATING), empty());
assertTrue(routingTable1.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary));
assertThat(routingNodes1.shardsWithState(ShardRoutingState.INITIALIZING), empty());
assertThat(routingNodes1.shardsWithState(ShardRoutingState.RELOCATING), empty());
assertTrue(routingNodes1.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary));
assertThat(routingTable1.index("highPriority").primaryShardsActive(), equalTo(2));
assertThat(routingTable1.index("mediumPriority").primaryShardsActive(), equalTo(1));
assertThat(routingTable1.index("lowPriority").shardsWithState(ShardRoutingState.STARTED), empty());
assertThat(routingTable1.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty());

final ClusterState reroutedState2 = rerouteAndStartShards(allocationService, reroutedState1);
final RoutingTable routingTable2 = reroutedState2.routingTable();
final RoutingNodes routingNodes2 = reroutedState2.getRoutingNodes();
// this reroute starts the one remaining medium-priority primary and both of the low-priority ones, but no replicas
assertThat(routingTable2.shardsWithState(ShardRoutingState.INITIALIZING), empty());
assertThat(routingTable2.shardsWithState(ShardRoutingState.RELOCATING), empty());
assertTrue(routingTable2.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary));
assertThat(routingNodes2.shardsWithState(ShardRoutingState.INITIALIZING), empty());
assertThat(routingNodes2.shardsWithState(ShardRoutingState.RELOCATING), empty());
assertTrue(routingNodes2.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary));
assertTrue(routingTable2.index("highPriority").allPrimaryShardsActive());
assertTrue(routingTable2.index("mediumPriority").allPrimaryShardsActive());
assertTrue(routingTable2.index("lowPriority").allPrimaryShardsActive());
assertThat(routingTable2.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty());

final ClusterState reroutedState3 = rerouteAndStartShards(allocationService, reroutedState2);
final RoutingTable routingTable3 = reroutedState3.routingTable();
final RoutingNodes routingNodes3 = reroutedState3.getRoutingNodes();
// this reroute starts the two medium-priority replicas since their allocator permits this
assertThat(routingTable3.shardsWithState(ShardRoutingState.INITIALIZING), empty());
assertThat(routingTable3.shardsWithState(ShardRoutingState.RELOCATING), empty());
assertThat(routingNodes3.shardsWithState(ShardRoutingState.INITIALIZING), empty());
assertThat(routingNodes3.shardsWithState(ShardRoutingState.RELOCATING), empty());
assertTrue(routingTable3.index("highPriority").allPrimaryShardsActive());
assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.UNASSIGNED), empty());
assertTrue(routingTable3.index("lowPriority").allPrimaryShardsActive());
Expand Down Expand Up @@ -338,7 +342,7 @@ public int getNumberOfInFlightFetches() {
private static ClusterState rerouteAndStartShards(final AllocationService allocationService, final ClusterState clusterState) {
final ClusterState reroutedState = allocationService.reroute(clusterState, "test");
return allocationService.applyStartedShards(reroutedState,
reroutedState.routingTable().shardsWithState(ShardRoutingState.INITIALIZING));
reroutedState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected ClusterState allocateNew(ClusterState state) {

ClusterState clusterState = ClusterState.builder(state).metadata(metadata).routingTable(initialRoutingTable).build();
clusterState = strategy.reroute(clusterState, "reroute");
while (clusterState.routingTable().shardsWithState(INITIALIZING).isEmpty() == false) {
while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
clusterState = ESAllocationTestCase.startInitializingShardsAndReroute(strategy, clusterState);
}
Map<String, Integer> counts = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private ClusterState rebalance(ClusterState clusterState) {
clusterState = strategy.reroute(clusterState, "reroute");
int numRelocations = 0;
while (true) {
List<ShardRouting> initializing = clusterState.routingTable().shardsWithState(INITIALIZING);
List<ShardRouting> initializing = clusterState.getRoutingNodes().shardsWithState(INITIALIZING);
if (initializing.isEmpty()) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,28 @@ public void testClusterConcurrentRebalance() {
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

// we only allow one relocation at a time
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(7));
assertThat(clusterState.routingTable().shardsWithState(RELOCATING).size(), equalTo(3));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(7));
assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), equalTo(3));

logger.info("finalize this session relocation, 3 more should relocate now");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

// we only allow one relocation at a time
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(7));
assertThat(clusterState.routingTable().shardsWithState(RELOCATING).size(), equalTo(3));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(7));
assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), equalTo(3));

logger.info("finalize this session relocation, 2 more should relocate now");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

// we only allow one relocation at a time
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(8));
assertThat(clusterState.routingTable().shardsWithState(RELOCATING).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(8));
assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), equalTo(2));

logger.info("finalize this session relocation, no more relocation");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

// we only allow one relocation at a time
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(10));
assertThat(clusterState.routingTable().shardsWithState(RELOCATING).size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(10));
assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testAutoReleaseIndices() {
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata).routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNormalNode("node1")).add(newNormalNode("node2"))).build(), allocation);
assertThat(clusterState.getRoutingTable().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));

final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpacesBuilder
= ImmutableOpenMap.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Long getShardSize(ShardRouting shardRouting) {

assertEquals(1, clusterState.getRoutingNodes().node("node1")
.numberOfShardsWithState(ShardRoutingState.INITIALIZING));
assertEquals(byteSize, clusterState.getRoutingTable()
assertEquals(byteSize, clusterState.getRoutingNodes()
.shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
logger.info("Start the primary shard");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
Expand All @@ -77,7 +77,7 @@ public Long getShardSize(ShardRouting shardRouting) {

assertEquals(1, clusterState.getRoutingNodes()
.node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
assertEquals(byteSize, clusterState.getRoutingTable()
assertEquals(byteSize, clusterState.getRoutingNodes()
.shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void testSingleShardMultipleAllocationFailures() {
nodeBuilder.add(newNode("node" + Integer.toString(i)));
}
clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
while (clusterState.routingTable().shardsWithState(UNASSIGNED).isEmpty() == false) {
while (clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).isEmpty() == false) {
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
}

Expand Down
Loading

0 comments on commit e7d87dc

Please sign in to comment.