Skip to content

Commit

Permalink
Fix autoexpand during node replace
Browse files Browse the repository at this point in the history
Prior to this change NodeReplacementAllocationDecider was unconditionally
skipping both replacement source and target nodes when calculation auto-expand
replicas. This is fixed by autoexpanding to the replacement node if source node
already had shards of the index

Backport of PR elastic#96281 amended for 7.17.x

Closes elastic#89527
  • Loading branch information
idegtiarenko authored and kingherc committed Aug 25, 2023
1 parent 0df52c8 commit f82db63
Show file tree
Hide file tree
Showing 6 changed files with 538 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ public DiscoveryNode findByAddress(TransportAddress address) {
return null;
}

/**
* Check if a node with provided name exists
*
* @return {@code true} node identified with provided name exists or {@code false} otherwise
*/
public boolean hasByName(String name) {
for (DiscoveryNode node : nodes.values()) {
if (node.getName().equals(name)) {
return true;
}
}
return false;
}

/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import java.util.Map;
import java.util.Optional;

/**
* An allocation decider that ensures that all the shards allocated to the node scheduled for removal are relocated to the replacement node.
* It also ensures that auto-expands replicas are expanded to only the replacement source or target (not both at the same time)
* and only of the shards that were already present on the source node.
*/
public class NodeReplacementAllocationDecider extends AllocationDecider {

public static final String NAME = "node_replacement";
Expand All @@ -37,8 +42,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
Decision.Type.YES,
NAME,
"node [%s] is replacing node [%s], and may receive shards from it",
shardRouting.currentNodeId(),
node.nodeId()
node.nodeId(),
shardRouting.currentNodeId()
);
} else if (isReplacementSource(allocation, shardRouting.currentNodeId())) {
return Decision.single(
Expand Down Expand Up @@ -96,22 +101,54 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
return NO_REPLACEMENTS;
} else if (isReplacementTargetName(allocation, node.getName())) {
final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.getName());
return Decision.single(
Decision.Type.NO,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shards cannot auto expand to be on it until the replacement is complete",
node.getId(),
shutdown == null ? null : shutdown.getNodeId()
);
final String sourceNodeId = shutdown != null ? shutdown.getNodeId() : null;
final boolean hasShardsAllocatedOnSourceOrTarget = hasShardOnNode(indexMetadata, node.getId(), allocation)
|| (sourceNodeId != null && hasShardOnNode(indexMetadata, sourceNodeId, allocation));

if (hasShardsAllocatedOnSourceOrTarget) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shard can auto expand to it as it was already present on the source node",
node.getId(),
sourceNodeId
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shards cannot auto expand to be on it until the replacement is complete",
node.getId(),
sourceNodeId
);
}
} else if (isReplacementSource(allocation, node.getId())) {
return Decision.single(
Decision.Type.NO,
NAME,
"node [%s] is being replaced by [%s], shards cannot auto expand to be on it",
node.getId(),
getReplacementName(allocation, node.getId())
);
final SingleNodeShutdownMetadata shutdown = allocation.metadata().nodeShutdowns().get(node.getId());
final String replacementNodeName = shutdown != null ? shutdown.getTargetNodeName() : null;
final boolean hasShardOnSource = hasShardOnNode(indexMetadata, node.getId(), allocation)
&& shutdown != null
&& allocation.nodes().hasByName(replacementNodeName) == false;

if (hasShardOnSource) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is being replaced by [%s], shards can auto expand to be on it "
+ "while replacement node has not joined the cluster",
node.getId(),
replacementNodeName
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is being replaced by [%s], shards cannot auto expand to be on it",
node.getId(),
replacementNodeName
);
}
} else {
return Decision.single(
Decision.Type.YES,
Expand All @@ -121,6 +158,11 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
}
}

private static boolean hasShardOnNode(IndexMetadata indexMetadata, String nodeId, RoutingAllocation allocation) {
RoutingNode node = allocation.routingNodes().node(nodeId);
return node != null && node.numberOfOwningShardsForIndex(indexMetadata.getIndex()) >= 1;
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
node.getId()
);
case REPLACE:
if (allocation.nodes().hasByName(thisNodeShutdownMetadata.getTargetNodeName()) == false) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is preparing to be removed from the cluster, but replacement is not yet present",
node.getId()
);
} else {
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId());
}
case REMOVE:
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId());
default:
Expand Down
Loading

0 comments on commit f82db63

Please sign in to comment.