Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save some RoutingNodes Instantiations #79941

33 changes: 30 additions & 3 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,21 @@ public ClusterState(
this.customs = customs;
this.wasReadFromDiff = wasReadFromDiff;
this.routingNodes = routingNodes;
assert routingNodes == null || routingNodes.equals(new RoutingNodes(this))
: "RoutingNodes [" + routingNodes + "] are not consistent with this cluster state [" + new RoutingNodes(this) + "]";
assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes);
}

private static boolean assertConsistentRoutingNodes(
RoutingTable routingTable,
DiscoveryNodes nodes,
@Nullable RoutingNodes routingNodes
) {
if (routingNodes == null) {
return true;
}
final RoutingNodes expected = RoutingNodes.immutable(routingTable, nodes);
assert routingNodes.equals(expected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equals does not look like it compares all fields, for instance on ShardRouting. Also, it would be even nicer to verify that all the ShardRouting instances are the same?

But then again, that really does not change with this PR, so feel free to disregard this.

: "RoutingNodes [" + routingNodes + "] are not consistent with this cluster state [" + expected + "]";
return true;
}

public long term() {
Expand Down Expand Up @@ -257,10 +270,24 @@ public RoutingNodes getRoutingNodes() {
if (routingNodes != null) {
return routingNodes;
}
routingNodes = new RoutingNodes(this);
routingNodes = RoutingNodes.immutable(routingTable, nodes);
return routingNodes;
}

/**
* Returns a fresh mutable copy of the routing nodes view.
*/
public RoutingNodes mutableRoutingNodes() {
final RoutingNodes nodes = this.routingNodes;
// use the cheaper copy constructor if we already computed the routing nodes for this state.
if (nodes != null) {
return nodes.mutableCopy();
}
// we don't have any routing nodes for this state, likely because it's a temporary state in the reroute logic, don't compute an
// immutable copy that will never be used and instead directly build a mutable copy
return RoutingNodes.mutable(routingTable, this.nodes);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
assert invariant();
}

private RoutingNode(RoutingNode original) {
this.nodeId = original.nodeId;
this.node = original.node;
this.shards = new LinkedHashMap<>(original.shards);
this.relocatingShards = new LinkedHashSet<>(original.relocatingShards);
this.initializingShards = new LinkedHashSet<>(original.initializingShards);
this.shardsByIndex = new LinkedHashMap<>(original.shardsByIndex.size());
for (Map.Entry<Index, LinkedHashSet<ShardRouting>> entry : original.shardsByIndex.entrySet()) {
shardsByIndex.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
}
assert invariant();
}

RoutingNode copy() {
return new RoutingNode(this);
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
final LinkedHashMap<ShardId, ShardRouting> shards = new LinkedHashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
Expand Down
132 changes: 103 additions & 29 deletions server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.service.MasterService;
Expand Down Expand Up @@ -44,8 +47,9 @@

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
* or disallowing changes to its elements.
* It can be either initialized as mutable or immutable allowing or disallowing changes to its elements.
* (see {@link RoutingNodes#mutable(RoutingTable, DiscoveryNodes)}, {@link RoutingNodes#immutable(RoutingTable, DiscoveryNodes)},
* and {@link #mutableCopy()})
*
* The main methods used to update routing entries are:
* <ul>
Expand All @@ -57,11 +61,11 @@
*/
public class RoutingNodes implements Iterable<RoutingNode> {

private final Map<String, RoutingNode> nodesToShards = new HashMap<>();
private final Map<String, RoutingNode> nodesToShards;

private final UnassignedShards unassignedShards = new UnassignedShards(this);
private final UnassignedShards unassignedShards;

private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();
private final Map<ShardId, List<ShardRouting>> assignedShards;

private final boolean readOnly;

Expand All @@ -75,21 +79,33 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private int totalShardCount = 0;

private final Map<String, Set<String>> attributeValuesByAttribute = new HashMap<>();
private final Map<String, Recoveries> recoveriesPerNode = new HashMap<>();
private final Map<String, Set<String>> attributeValuesByAttribute;
private final Map<String, Recoveries> recoveriesPerNode;

/**
* Creates an immutable instance from the {@link RoutingTable} and {@link DiscoveryNodes} found in a cluster state. Used to initialize
* the routing nodes in {@link ClusterState#getRoutingNodes()}. This method should not be used directly, use
* {@link ClusterState#getRoutingNodes()} instead.
*/
public static RoutingNodes immutable(RoutingTable routingTable, DiscoveryNodes discoveryNodes) {
return new RoutingNodes(routingTable, discoveryNodes, true);
}

public RoutingNodes(ClusterState clusterState) {
this(clusterState, true);
public static RoutingNodes mutable(RoutingTable routingTable, DiscoveryNodes discoveryNodes) {
return new RoutingNodes(routingTable, discoveryNodes, false);
}

public RoutingNodes(ClusterState clusterState, boolean readOnly) {
private RoutingNodes(RoutingTable routingTable, DiscoveryNodes discoveryNodes, boolean readOnly) {
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
this.recoveriesPerNode = new HashMap<>();
this.assignedShards = new HashMap<>();
this.unassignedShards = new UnassignedShards(this);
this.attributeValuesByAttribute = new HashMap<>();

Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
final Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>(discoveryNodes.getDataNodes().size());
// fill in the nodeToShards with the "live" nodes
for (DiscoveryNode node : clusterState.nodes().getDataNodes().values()) {
nodesToShards.put(node.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order
for (ObjectCursor<String> node : discoveryNodes.getDataNodes().keys()) {
nodesToShards.put(node.value, new LinkedHashMap<>()); // LinkedHashMap to preserve order
}

// fill in the inverse of node -> shards allocated
Expand All @@ -104,11 +120,9 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// by the ShardId, as this is common for primary and replicas.
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
if (shard.assignedToNode()) {
Map<ShardId, ShardRouting> entries = nodesToShards.computeIfAbsent(
shard.currentNodeId(),
k -> new LinkedHashMap<>()
); // LinkedHashMap to preserve order
ShardRouting previousValue = entries.put(shard.shardId(), shard);
// LinkedHashMap to preserve order
ShardRouting previousValue = nodesToShards.computeIfAbsent(shard.currentNodeId(), k -> new LinkedHashMap<>())
.put(shard.shardId(), shard);
if (previousValue != null) {
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
}
Expand All @@ -118,13 +132,12 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
}
if (shard.relocating()) {
relocatingShards++;
// LinkedHashMap to preserve order.
// Add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
entries = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>());
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary);
previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting);
// LinkedHashMap to preserve order.
// Add the counterpart shard with relocatingNodeId reflecting the source from which it's relocating from.
previousValue = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>())
.put(targetShardRouting.shardId(), targetShardRouting);
if (previousValue != null) {
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
}
Expand All @@ -142,12 +155,50 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
}
}
}
this.nodesToShards = new HashMap<>(nodesToShards.size());
for (Map.Entry<String, LinkedHashMap<ShardId, ShardRouting>> entry : nodesToShards.entrySet()) {
String nodeId = entry.getKey();
this.nodesToShards.put(nodeId, new RoutingNode(nodeId, clusterState.nodes().get(nodeId), entry.getValue()));
this.nodesToShards.put(nodeId, new RoutingNode(nodeId, discoveryNodes.get(nodeId), entry.getValue()));
}
}

private RoutingNodes(RoutingNodes routingNodes) {
// we should not call this on mutable instances, it's still expensive to create the copy and callers should instead mutate a single
// instance
assert routingNodes.readOnly : "tried to create a mutable copy from a mutable instance";
this.readOnly = false;
this.nodesToShards = new HashMap<>(routingNodes.nodesToShards.size());
for (Map.Entry<String, RoutingNode> entry : routingNodes.nodesToShards.entrySet()) {
this.nodesToShards.put(entry.getKey(), entry.getValue().copy());
}
this.assignedShards = new HashMap<>(routingNodes.assignedShards.size());
for (Map.Entry<ShardId, List<ShardRouting>> entry : routingNodes.assignedShards.entrySet()) {
this.assignedShards.put(entry.getKey(), new ArrayList<>(entry.getValue()));
}
this.unassignedShards = routingNodes.unassignedShards.copyFor(this);

this.inactivePrimaryCount = routingNodes.inactivePrimaryCount;
this.inactiveShardCount = routingNodes.inactiveShardCount;
this.relocatingShards = routingNodes.relocatingShards;
this.activeShardCount = routingNodes.activeShardCount;
this.totalShardCount = routingNodes.totalShardCount;
this.attributeValuesByAttribute = new HashMap<>(routingNodes.attributeValuesByAttribute.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lazy cache of some immutable properties of the immutable set of nodes so it's never meaningfully mutated, so really we could just re-use the same map, or else instantiate it as an empty map. Also if it's only used in AwarenessAllocationDecider it should be empty on immutable instances anyway. But maybe it's nicer to explicitly copy it here anyway just for clarity.

Maybe we should move this to DiscoveryNodes so it doesn't get invalidated on every cluster state update too. Probably not a big deal, just thinking out loud here.

for (Map.Entry<String, Set<String>> entry : routingNodes.attributeValuesByAttribute.entrySet()) {
this.attributeValuesByAttribute.put(entry.getKey(), new HashSet<>(entry.getValue()));
}
this.recoveriesPerNode = new HashMap<>(routingNodes.recoveriesPerNode.size());
for (Map.Entry<String, Recoveries> entry : routingNodes.recoveriesPerNode.entrySet()) {
this.recoveriesPerNode.put(entry.getKey(), entry.getValue().copy());
}
}

/**
* @return a mutable copy of this instance
*/
public RoutingNodes mutableCopy() {
return new RoutingNodes(this);
}

private void addRecovery(ShardRouting routing) {
updateRecoveryCounts(routing, true, findAssignedPrimaryIfPeerRecovery(routing));
}
Expand Down Expand Up @@ -864,13 +915,29 @@ public static final class UnassignedShards implements Iterable<ShardRouting> {
private final List<ShardRouting> unassigned;
private final List<ShardRouting> ignored;

private int primaries = 0;
private int ignoredPrimaries = 0;
private int primaries;
private int ignoredPrimaries;

public UnassignedShards(RoutingNodes nodes) {
this(nodes, new ArrayList<>(), new ArrayList<>(), 0, 0);
}

private UnassignedShards(
RoutingNodes nodes,
List<ShardRouting> unassigned,
List<ShardRouting> ignored,
int primaries,
int ignoredPrimaries
) {
this.nodes = nodes;
unassigned = new ArrayList<>();
ignored = new ArrayList<>();
this.unassigned = unassigned;
this.ignored = ignored;
this.primaries = primaries;
this.ignoredPrimaries = ignoredPrimaries;
}

public UnassignedShards copyFor(RoutingNodes newNodes) {
return new UnassignedShards(newNodes, new ArrayList<>(unassigned), new ArrayList<>(ignored), primaries, ignoredPrimaries);
}

public void add(ShardRouting shardRouting) {
Expand Down Expand Up @@ -1277,6 +1344,13 @@ private static final class Recoveries {
private int incoming = 0;
private int outgoing = 0;

public Recoveries copy() {
final Recoveries copy = new Recoveries();
copy.incoming = incoming;
copy.outgoing = outgoing;
return copy;
}

void addOutgoing(int howMany) {
assert outgoing + howMany >= 0 : outgoing + howMany + " must be >= 0";
outgoing += howMany;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<RoutingTable> {

public static final RoutingTable EMPTY_ROUTING_TABLE = builder().build();
public static final RoutingTable EMPTY_ROUTING_TABLE = new RoutingTable(0, ImmutableOpenMap.of());

private final long version;

Expand Down Expand Up @@ -407,7 +407,6 @@ public Builder(RoutingTable routingTable) {
}
}

@SuppressWarnings("unchecked")
public Builder updateNodes(long version, RoutingNodes routingNodes) {
// this is being called without pre initializing the routing table, so we must copy over the version as well
this.version = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.ESLogMessage;
import org.elasticsearch.gateway.GatewayAllocator;
Expand Down Expand Up @@ -316,7 +317,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
final Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
for (Map.Entry<Integer, List<String>> entry : autoExpandReplicaChanges.entrySet()) {
final int numberOfReplicas = entry.getKey();
final String[] indices = entry.getValue().toArray(new String[entry.getValue().size()]);
final String[] indices = entry.getValue().toArray(Strings.EMPTY_ARRAY);
// we do *not* update the in sync allocation ids as they will be removed upon the first index
// operation which make these copies stale
routingTableBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
Expand Down Expand Up @@ -614,7 +615,7 @@ private void applyStartedShards(RoutingAllocation routingAllocation, List<ShardR
* Create a mutable {@link RoutingNodes}. This is a costly operation so this must only be called once!
*/
private RoutingNodes getMutableRoutingNodes(ClusterState clusterState) {
return new RoutingNodes(clusterState, false);
return clusterState.mutableRoutingNodes();
}

/** override this to control time based decisions during allocation */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -734,7 +733,7 @@ public void testMoveShardToNonDataNode() {
MoveAllocationCommand command = new MoveAllocationCommand(index.getName(), 0, "node1", "node2");
RoutingAllocation routingAllocation = new RoutingAllocation(
new AllocationDeciders(Collections.emptyList()),
new RoutingNodes(clusterState, false),
clusterState.mutableRoutingNodes(),
clusterState,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
Expand Down Expand Up @@ -801,7 +800,7 @@ public void testMoveShardFromNonDataNode() {
MoveAllocationCommand command = new MoveAllocationCommand(index.getName(), 0, "node2", "node1");
RoutingAllocation routingAllocation = new RoutingAllocation(
new AllocationDeciders(Collections.emptyList()),
new RoutingNodes(clusterState, false),
clusterState.mutableRoutingNodes(),
clusterState,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
private RoutingAllocation newRoutingAllocation(AllocationDeciders deciders, ClusterState state) {
RoutingAllocation allocation = new RoutingAllocation(
deciders,
new RoutingNodes(state, false),
state.mutableRoutingNodes(),
state,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public void testMessages() {
);

final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.AbstractRoutingChangesObserver();
final RoutingNodes routingNodes = new RoutingNodes(clusterState, false);
final RoutingNodes routingNodes = clusterState.mutableRoutingNodes();
final ShardRouting startedPrimary = routingNodes.startShard(
logger,
routingNodes.initializeShard(primaryShard, "newNode", null, 0, routingChangesObserver),
Expand Down
Loading