From f82db63b1ddb98cf9b4fea4754d93e08416ca3dd Mon Sep 17 00:00:00 2001 From: Ievgen Degtiarenko Date: Wed, 7 Jun 2023 10:49:06 +0200 Subject: [PATCH] Fix autoexpand during node replace Prior to this change NodeReplacementAllocationDecider was unconditionally skipping both replacement source and target nodes when calculation auto-expand replicas. This is fixed by autoexpanding to the replacement node if source node already had shards of the index Backport of PR #96281 amended for 7.17.x Closes #89527 --- .../cluster/node/DiscoveryNodes.java | 14 + .../NodeReplacementAllocationDecider.java | 76 +++- .../NodeShutdownAllocationDecider.java | 10 + ...NodeReplacementAllocationDeciderTests.java | 362 ++++++++++++++---- .../NodeShutdownAllocationDeciderTests.java | 152 +++++--- .../xpack/shutdown/NodeShutdownShardsIT.java | 92 +++-- 6 files changed, 538 insertions(+), 168 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index b6c99e17785df..c1864229c1955 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -277,6 +277,20 @@ public DiscoveryNode findByAddress(TransportAddress address) { return null; } + /** + * Check if a node with provided name exists + * + * @return {@code true} node identified with provided name exists or {@code false} otherwise + */ + public boolean hasByName(String name) { + for (DiscoveryNode node : nodes.values()) { + if (node.getName().equals(name)) { + return true; + } + } + return false; + } + /** * Returns the version of the node with the oldest version in the cluster that is not a client node * diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java index dbaecd67cf429..837ecb27e24fd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java @@ -18,6 +18,11 @@ import java.util.Map; import java.util.Optional; +/** + * An allocation decider that ensures that all the shards allocated to the node scheduled for removal are relocated to the replacement node. + * It also ensures that auto-expands replicas are expanded to only the replacement source or target (not both at the same time) + * and only of the shards that were already present on the source node. + */ public class NodeReplacementAllocationDecider extends AllocationDecider { public static final String NAME = "node_replacement"; @@ -37,8 +42,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing Decision.Type.YES, NAME, "node [%s] is replacing node [%s], and may receive shards from it", - shardRouting.currentNodeId(), - node.nodeId() + node.nodeId(), + shardRouting.currentNodeId() ); } else if (isReplacementSource(allocation, shardRouting.currentNodeId())) { return Decision.single( @@ -96,22 +101,54 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod return NO_REPLACEMENTS; } else if (isReplacementTargetName(allocation, node.getName())) { final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.getName()); - return Decision.single( - Decision.Type.NO, - NAME, - "node [%s] is a node replacement target for node [%s], " - + "shards cannot auto expand to be on it until the replacement is complete", - node.getId(), - shutdown == null ? null : shutdown.getNodeId() - ); + final String sourceNodeId = shutdown != null ? shutdown.getNodeId() : null; + final boolean hasShardsAllocatedOnSourceOrTarget = hasShardOnNode(indexMetadata, node.getId(), allocation) + || (sourceNodeId != null && hasShardOnNode(indexMetadata, sourceNodeId, allocation)); + + if (hasShardsAllocatedOnSourceOrTarget) { + return allocation.decision( + Decision.YES, + NAME, + "node [%s] is a node replacement target for node [%s], " + + "shard can auto expand to it as it was already present on the source node", + node.getId(), + sourceNodeId + ); + } else { + return allocation.decision( + Decision.NO, + NAME, + "node [%s] is a node replacement target for node [%s], " + + "shards cannot auto expand to be on it until the replacement is complete", + node.getId(), + sourceNodeId + ); + } } else if (isReplacementSource(allocation, node.getId())) { - return Decision.single( - Decision.Type.NO, - NAME, - "node [%s] is being replaced by [%s], shards cannot auto expand to be on it", - node.getId(), - getReplacementName(allocation, node.getId()) - ); + final SingleNodeShutdownMetadata shutdown = allocation.metadata().nodeShutdowns().get(node.getId()); + final String replacementNodeName = shutdown != null ? shutdown.getTargetNodeName() : null; + final boolean hasShardOnSource = hasShardOnNode(indexMetadata, node.getId(), allocation) + && shutdown != null + && allocation.nodes().hasByName(replacementNodeName) == false; + + if (hasShardOnSource) { + return allocation.decision( + Decision.YES, + NAME, + "node [%s] is being replaced by [%s], shards can auto expand to be on it " + + "while replacement node has not joined the cluster", + node.getId(), + replacementNodeName + ); + } else { + return allocation.decision( + Decision.NO, + NAME, + "node [%s] is being replaced by [%s], shards cannot auto expand to be on it", + node.getId(), + replacementNodeName + ); + } } else { return Decision.single( Decision.Type.YES, @@ -121,6 +158,11 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod } } + private static boolean hasShardOnNode(IndexMetadata indexMetadata, String nodeId, RoutingAllocation allocation) { + RoutingNode node = allocation.routingNodes().node(nodeId); + return node != null && node.numberOfOwningShardsForIndex(indexMetadata.getIndex()) >= 1; + } + @Override public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java index 020d9af349500..29e449d1ec665 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDecider.java @@ -100,6 +100,16 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod node.getId() ); case REPLACE: + if (allocation.nodes().hasByName(thisNodeShutdownMetadata.getTargetNodeName()) == false) { + return allocation.decision( + Decision.YES, + NAME, + "node [%s] is preparing to be removed from the cluster, but replacement is not yet present", + node.getId() + ); + } else { + return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId()); + } case REMOVE: return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId()); default: diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java index bd22bfe215ff4..6754bde46716f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -19,8 +20,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -37,6 +40,9 @@ import java.util.HashMap; import static org.hamcrest.Matchers.containsString; +import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.equalTo; public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase { @@ -85,25 +91,18 @@ public void testNoReplacements() { .nodes(DiscoveryNodes.builder().add(NODE_A).add(NODE_B).add(NODE_C).build()) .build(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingAllocation allocation = createRoutingAllocation(state); DiscoveryNode node = randomFrom(NODE_A, NODE_B, NODE_C); RoutingNode routingNode = new RoutingNode(node.getId(), node, shard); - allocation.debugDecision(true); - - Decision decision = decider.canAllocate(shard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.YES)); - assertThat(decision.getExplanation(), equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS.getExplanation())); - decision = decider.canRemain(shard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.YES)); - assertThat(decision.getExplanation(), equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS.getExplanation())); + assertThat(decider.canAllocate(shard, routingNode, allocation), equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS)); + assertThat(decider.canRemain(shard, routingNode, allocation), equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS)); } public void testCanForceAllocate() { - ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), NODE_A.getId(), NODE_B.getName()); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName()); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(NODE_A.getId(), NODE_A, shard); - allocation.debugDecision(true); ShardRouting assignedShard = ShardRouting.newUnassigned( new ShardId("myindex", "myindex", 0), @@ -114,39 +113,33 @@ public void testCanForceAllocate() { assignedShard = assignedShard.initialize(NODE_A.getId(), null, 1); assignedShard = assignedShard.moveToStarted(); - Decision decision = decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.NO)); - assertThat( - decision.getExplanation(), - equalTo("shard is not on the source of a node replacement relocated to the replacement target") + assertThatDecision( + decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation), + Decision.Type.NO, + "shard is not on the source of a node replacement relocated to the replacement target" ); routingNode = new RoutingNode(NODE_B.getId(), NODE_B, assignedShard); - decision = decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.YES)); - assertThat( - decision.getExplanation(), - equalTo( - "node [" + NODE_A.getId() + "] is being replaced by node [" + NODE_B.getId() + "], and can be force vacated to the target" - ) + assertThatDecision( + decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation), + Decision.Type.YES, + "node [" + NODE_A.getId() + "] is being replaced by node [" + NODE_B.getId() + "], and can be force vacated to the target" ); routingNode = new RoutingNode(NODE_C.getId(), NODE_C, assignedShard); - decision = decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.NO)); - assertThat( - decision.getExplanation(), - equalTo("shard is not on the source of a node replacement relocated to the replacement target") + assertThatDecision( + decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation), + Decision.Type.NO, + "shard is not on the source of a node replacement relocated to the replacement target" ); } public void testCannotRemainOnReplacedNode() { - ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), NODE_A.getId(), NODE_B.getName()); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName()); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(NODE_A.getId(), NODE_A, shard); - allocation.debugDecision(true); Decision decision = decider.canRemain(shard, routingNode, allocation); assertThat(decision.type(), equalTo(Decision.Type.NO)); @@ -169,62 +162,303 @@ public void testCannotRemainOnReplacedNode() { } public void testCanAllocateToNeitherSourceNorTarget() { - ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), NODE_A.getId(), NODE_B.getName()); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(NODE_A.getId(), NODE_B.getName()); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(NODE_A.getId(), NODE_A, shard); - allocation.debugDecision(true); ShardRouting testShard = this.shard; if (randomBoolean()) { testShard = shard.initialize(NODE_C.getId(), null, 1); testShard = testShard.moveToStarted(); } - Decision decision = decider.canAllocate(testShard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.NO)); - assertThat( - decision.getExplanation(), - equalTo("node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + "], so no data may be allocated to it") + assertThatDecision( + decider.canAllocate(testShard, routingNode, allocation), + Decision.Type.NO, + "node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + "], so no data may be allocated to it" ); routingNode = new RoutingNode(NODE_B.getId(), NODE_B, testShard); - decision = decider.canAllocate(testShard, routingNode, allocation); - assertThat(decision.type(), equalTo(Decision.Type.NO)); - assertThat( - decision.getExplanation(), - equalTo( - "node [" - + NODE_B.getId() - + "] is replacing the vacating node [" - + NODE_A.getId() - + "], only data currently allocated " - + "to the source node may be allocated to it until the replacement is complete" - ) + assertThatDecision( + decider.canAllocate(testShard, routingNode, allocation), + Decision.Type.NO, + "node [" + + NODE_B.getId() + + "] is replacing the vacating node [" + + NODE_A.getId() + + "], only data currently allocated " + + "to the source node may be allocated to it until the replacement is complete" ); routingNode = new RoutingNode(NODE_C.getId(), NODE_C, testShard); - decision = decider.canAllocate(testShard, routingNode, allocation); + Decision decision = decider.canAllocate(testShard, routingNode, allocation); assertThat(decision.getExplanation(), decision.type(), equalTo(Decision.Type.YES)); assertThat(decision.getExplanation(), containsString("neither the source nor target node are part of an ongoing node replacement")); } - private ClusterState prepareState(ClusterState initialState, String sourceNodeId, String targetNodeName) { - final SingleNodeShutdownMetadata nodeShutdownMetadata = SingleNodeShutdownMetadata.builder() - .setNodeId(sourceNodeId) - .setTargetNodeName(targetNodeName) - .setType(SingleNodeShutdownMetadata.Type.REPLACE) - .setReason(this.getTestName()) - .setStartedAtMillis(1L) + public void testShouldNotAutoExpandReplicasDuringUnrelatedNodeReplacement() { + + IndexMetadata indexMetadata = IndexMetadata.builder(idxName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .build() + ) + .build(); + ShardId shardId = new ShardId(indexMetadata.getIndex(), 0); + + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(NODE_A).add(NODE_C).build()) + .metadata( + Metadata.builder() + .put(IndexMetadata.builder(indexMetadata)) + .putCustom(NodesShutdownMetadata.TYPE, createNodeShutdownReplacementMetadata(NODE_A.getId(), NODE_B.getName())) + ) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(newShardRouting(shardId, NODE_C.getId(), true, STARTED)) + .build() + ) + .build() + ) .build(); - NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata( - nodeShutdownMetadata + + // before replacement node has joined + { + RoutingAllocation allocation = createRoutingAllocation(state); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_A, allocation), + Decision.Type.NO, + "node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getId() + "], shards cannot auto expand to be on it" + ); + assertThat(allocation.nodes().hasByName(NODE_B.getName()), equalTo(false)); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_B, allocation), + Decision.Type.NO, + "node [" + + NODE_B.getId() + + "] is a node replacement target for node [" + + NODE_A.getId() + + "], " + + "shards cannot auto expand to be on it until the replacement is complete" + + ); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_C, allocation), + Decision.Type.YES, + "node is not part of a node replacement, so shards may be auto expanded onto it" + ); + } + + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder().add(NODE_A).add(NODE_B).add(NODE_C).build()).build(); + + // after replacement node has joined + { + RoutingAllocation allocation = createRoutingAllocation(state); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_A, allocation), + Decision.Type.NO, + "node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getId() + "], shards cannot auto expand to be on it" + ); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_B, allocation), + Decision.Type.NO, + "node [" + + NODE_B.getId() + + "] is a node replacement target for node [" + + NODE_A.getId() + + "], " + + "shards cannot auto expand to be on it until the replacement is complete" + + ); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_C, allocation), + Decision.Type.YES, + "node is not part of a node replacement, so shards may be auto expanded onto it" + ); + } + } + + public void testShouldNotContractAutoExpandReplicasDuringNodeReplacement() { + + IndexMetadata indexMetadata = IndexMetadata.builder(idxName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .build() + ) + .build(); + ShardId shardId = new ShardId(indexMetadata.getIndex(), 0); + + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(NODE_A).add(NODE_C).build()) + .metadata(Metadata.builder().put(IndexMetadata.builder(indexMetadata))) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(newShardRouting(shardId, NODE_A.getId(), true, STARTED)) + .addShard(newShardRouting(shardId, NODE_C.getId(), false, STARTED)) + .build() + ) + .build() + ) + .build(); + + // index is already allocated on both nodes + { + RoutingAllocation allocation = createRoutingAllocation(state); + assertThat( + decider.shouldAutoExpandToNode(indexMetadata, NODE_A, allocation), + equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS) + ); + assertThat("node-b has not joined yet", allocation.nodes().hasByName(NODE_B.getName()), equalTo(false)); + assertThat( + decider.shouldAutoExpandToNode(indexMetadata, NODE_C, allocation), + equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS) + ); + } + + // when registering node replacement + state = ClusterState.builder(state) + .metadata( + Metadata.builder(state.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, createNodeShutdownReplacementMetadata(NODE_A.getId(), NODE_B.getName())) + .build() + ) + .build(); + { + RoutingAllocation allocation = createRoutingAllocation(state); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_A, allocation), + Decision.Type.YES, + "node [" + + NODE_A.getId() + + "] is being replaced by [" + + NODE_B.getId() + + "], shards can auto expand to be on it " + + "while replacement node has not joined the cluster" + ); + assertThat("node-b has not joined yet", allocation.nodes().hasByName(NODE_B.getName()), equalTo(false)); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_C, allocation), + Decision.Type.YES, + "node is not part of a node replacement, so shards may be auto expanded onto it" + ); + } + + // when starting node replacement + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder().add(NODE_A).add(NODE_B).add(NODE_C).build()).build(); + { + RoutingAllocation allocation = createRoutingAllocation(state); + assertThatDecision( + decider.canAllocate( + allocation.routingNodes().node(NODE_A.getId()).getByShardId(shardId), + allocation.routingNodes().node(NODE_B.getId()), + allocation + ), + Decision.Type.YES, + "node [" + NODE_B.getId() + "] is replacing node [" + NODE_A.getId() + "], and may receive shards from it" + ); + assertThatAutoExpandReplicasDidNotContract(indexMetadata, allocation); + } + + // when index is relocating + state = ClusterState.builder(state) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(newShardRouting(shardId, NODE_A.getId(), NODE_B.getId(), true, RELOCATING)) + .addShard(newShardRouting(shardId, NODE_C.getId(), false, STARTED)) + .build() + ) + .build() + ) + .build(); + assertThatAutoExpandReplicasDidNotContract(indexMetadata, createRoutingAllocation(state)); + + // when index is relocated + state = ClusterState.builder(state) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(newShardRouting(shardId, NODE_B.getId(), true, STARTED)) + .addShard(newShardRouting(shardId, NODE_C.getId(), false, STARTED)) + .build() + ) + .build() + ) + .build(); + assertThatAutoExpandReplicasDidNotContract(indexMetadata, createRoutingAllocation(state)); + + // when source node is removed + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder().add(NODE_B).add(NODE_C).build()).build(); + assertThatAutoExpandReplicasDidNotContract(indexMetadata, createRoutingAllocation(state)); + } + + private void assertThatAutoExpandReplicasDidNotContract(IndexMetadata indexMetadata, RoutingAllocation allocation) { + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_A, allocation), + Decision.Type.NO, + "node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getId() + "], shards cannot auto expand to be on it" + ); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_B, allocation), + Decision.Type.YES, + "node [" + + NODE_B.getId() + + "] is a node replacement target for node [" + + NODE_A.getId() + + "], " + + "shard can auto expand to it as it was already present on the source node" + ); + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, NODE_C, allocation), + Decision.Type.YES, + "none of the ongoing node replacements relate to the allocation of this shard" ); - return ClusterState.builder(initialState) + } + + private ClusterState prepareState(String sourceNodeId, String targetNodeName) { + return ClusterState.builder(ClusterName.DEFAULT) .nodes(DiscoveryNodes.builder().add(NODE_A).add(NODE_B).add(NODE_C).build()) .metadata( - Metadata.builder().put(IndexMetadata.builder(indexMetadata)).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata) + Metadata.builder() + .put(IndexMetadata.builder(indexMetadata)) + .putCustom(NodesShutdownMetadata.TYPE, createNodeShutdownReplacementMetadata(sourceNodeId, targetNodeName)) ) .build(); } + + private NodesShutdownMetadata createNodeShutdownReplacementMetadata(String sourceNodeId, String targetNodeName) { + return new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata( + SingleNodeShutdownMetadata.builder() + .setNodeId(sourceNodeId) + .setTargetNodeName(targetNodeName) + .setType(SingleNodeShutdownMetadata.Type.REPLACE) + .setReason(this.getTestName()) + .setStartedAtMillis(1L) + .build() + ); + } + + private RoutingAllocation createRoutingAllocation(ClusterState state) { + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + return allocation; + } + + private static void assertThatDecision(Decision decision, Decision.Type type, String explanation) { + assertThat(decision.type(), equalTo(type)); + assertThat(decision.getExplanation(), equalTo(explanation)); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java index bddae13b156f3..9b0d2fbf3379a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeShutdownAllocationDeciderTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -77,13 +78,9 @@ public class NodeShutdownAllocationDeciderTests extends ESAllocationTestCase { .build(); public void testCanAllocateShardsToRestartingNode() { - ClusterState state = prepareState( - service.reroute(ClusterState.EMPTY_STATE, "initial state"), - SingleNodeShutdownMetadata.Type.RESTART - ); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(SingleNodeShutdownMetadata.Type.RESTART); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); - allocation.debugDecision(true); Decision decision = decider.canAllocate(shard, routingNode, allocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); @@ -94,13 +91,9 @@ public void testCanAllocateShardsToRestartingNode() { } public void testCannotAllocateShardsToRemovingNode() { - ClusterState state = prepareState( - service.reroute(ClusterState.EMPTY_STATE, "initial state"), - randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE) - ); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); - allocation.debugDecision(true); Decision decision = decider.canAllocate(shard, routingNode, allocation); assertThat(decision.type(), equalTo(Decision.Type.NO)); @@ -108,13 +101,9 @@ public void testCannotAllocateShardsToRemovingNode() { } public void testShardsCanRemainOnRestartingNode() { - ClusterState state = prepareState( - service.reroute(ClusterState.EMPTY_STATE, "initial state"), - SingleNodeShutdownMetadata.Type.RESTART - ); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(SingleNodeShutdownMetadata.Type.RESTART); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); - allocation.debugDecision(true); Decision decision = decider.canRemain(shard, routingNode, allocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); @@ -125,13 +114,9 @@ public void testShardsCanRemainOnRestartingNode() { } public void testShardsCannotRemainOnRemovingNode() { - ClusterState state = prepareState( - service.reroute(ClusterState.EMPTY_STATE, "initial state"), - randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE) - ); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + ClusterState state = prepareState(randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)); + RoutingAllocation allocation = createRoutingAllocation(state); RoutingNode routingNode = new RoutingNode(DATA_NODE.getId(), DATA_NODE, shard); - allocation.debugDecision(true); Decision decision = decider.canRemain(shard, routingNode, allocation); assertThat(decision.type(), equalTo(Decision.Type.NO)); @@ -139,12 +124,8 @@ public void testShardsCannotRemainOnRemovingNode() { } public void testCanAutoExpandToRestartingNode() { - ClusterState state = prepareState( - service.reroute(ClusterState.EMPTY_STATE, "initial state"), - SingleNodeShutdownMetadata.Type.RESTART - ); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); - allocation.debugDecision(true); + ClusterState state = prepareState(SingleNodeShutdownMetadata.Type.RESTART); + RoutingAllocation allocation = createRoutingAllocation(state); Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); @@ -154,11 +135,20 @@ public void testCanAutoExpandToRestartingNode() { ); } - public void testCanAutoExpandToNodeThatIsNotShuttingDown() { - ClusterState state = service.reroute(ClusterState.EMPTY_STATE, "initial state"); + public void testCanAutoExpandToNodeIfNoNodesShuttingDown() { + RoutingAllocation allocation = createRoutingAllocation(ClusterState.EMPTY_STATE); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); - allocation.debugDecision(true); + Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is not preparing for removal from the cluster") + ); + } + + public void testCanAutoExpandToNodeThatIsNotShuttingDown() { + ClusterState state = prepareState(randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)); + RoutingAllocation allocation = createRoutingAllocation(state); Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); @@ -166,35 +156,93 @@ public void testCanAutoExpandToNodeThatIsNotShuttingDown() { } public void testCannotAutoExpandToRemovingNode() { - ClusterState state = prepareState( - service.reroute(ClusterState.EMPTY_STATE, "initial state"), - randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE) - ); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); - allocation.debugDecision(true); + ClusterState state = prepareState(randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)); + RoutingAllocation allocation = createRoutingAllocation(state); Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation); assertThat(decision.type(), equalTo(Decision.Type.NO)); assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing for removal from the cluster")); } - private ClusterState prepareState(ClusterState initialState, SingleNodeShutdownMetadata.Type shutdownType) { - final String targetNodeName = shutdownType == SingleNodeShutdownMetadata.Type.REPLACE ? randomAlphaOfLengthBetween(10, 20) : null; - final SingleNodeShutdownMetadata nodeShutdownMetadata = SingleNodeShutdownMetadata.builder() - .setNodeId(DATA_NODE.getId()) - .setType(shutdownType) - .setReason(this.getTestName()) - .setStartedAtMillis(1L) - .setTargetNodeName(targetNodeName) + public void testAutoExpandDuringNodeReplacement() { + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DATA_NODE).build()) + .metadata(Metadata.builder().put(IndexMetadata.builder(indexMetadata))) + .build(); + + // should auto-expand when no shutdown + Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, createRoutingAllocation(state)); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo("node [" + DATA_NODE.getId() + "] is not preparing for removal from the cluster") + ); + + // should auto-expand to source when shutdown/replacement entry is registered and node replacement has not started + NodesShutdownMetadata shutdown = createNodesShutdownMetadata(SingleNodeShutdownMetadata.Type.REPLACE, DATA_NODE.getId()); + state = ClusterState.builder(state) + .metadata(Metadata.builder(state.metadata()).putCustom(NodesShutdownMetadata.TYPE, shutdown).build()) .build(); - NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata( - nodeShutdownMetadata + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, createRoutingAllocation(state)), + Decision.Type.YES, + "node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster, but replacement is not yet present" + ); + + // should auto-expand to replacement when node replacement has started + String replacementName = shutdown.getAllNodeMetadataMap().get(DATA_NODE.getId()).getTargetNodeName(); + DiscoveryNode replacementNode = newNode(replacementName, "node-data-1", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.getNodes()).add(replacementNode).build()).build(); + + assertThatDecision( + decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, createRoutingAllocation(state)), + Decision.Type.NO, + "node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster" + ); + decision = decider.shouldAutoExpandToNode(indexMetadata, replacementNode, createRoutingAllocation(state)); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo("node [" + replacementNode.getId() + "] is not preparing for removal from the cluster") ); - return ClusterState.builder(initialState) + } + + private ClusterState prepareState(SingleNodeShutdownMetadata.Type shutdownType) { + return prepareState(shutdownType, DATA_NODE.getId()); + } + + private ClusterState prepareState(SingleNodeShutdownMetadata.Type shutdownType, String nodeId) { + return ClusterState.builder(ClusterName.DEFAULT) .nodes(DiscoveryNodes.builder().add(DATA_NODE).build()) .metadata( - Metadata.builder().put(IndexMetadata.builder(indexMetadata)).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata) + Metadata.builder() + .put(IndexMetadata.builder(indexMetadata)) + .putCustom(NodesShutdownMetadata.TYPE, createNodesShutdownMetadata(shutdownType, nodeId)) ) .build(); } + + private NodesShutdownMetadata createNodesShutdownMetadata(SingleNodeShutdownMetadata.Type shutdownType, String nodeId) { + final String targetNodeName = shutdownType == SingleNodeShutdownMetadata.Type.REPLACE ? randomAlphaOfLengthBetween(10, 20) : null; + return new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata( + SingleNodeShutdownMetadata.builder() + .setNodeId(nodeId) + .setType(shutdownType) + .setReason(this.getTestName()) + .setStartedAtMillis(1L) + .setTargetNodeName(targetNodeName) + .build() + ); + } + + private RoutingAllocation createRoutingAllocation(ClusterState state) { + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + return allocation; + } + + private static void assertThatDecision(Decision decision, Decision.Type type, String explanation) { + assertThat(decision.type(), equalTo(type)); + assertThat(decision.getExplanation(), equalTo(explanation)); + } } diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java index 9d010692d87c6..1a9cc35b1d448 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -29,8 +30,8 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -44,7 +45,7 @@ public class NodeShutdownShardsIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(ShutdownPlugin.class); + return Collections.singletonList(ShutdownPlugin.class); } /** @@ -500,45 +501,17 @@ public void testAutoExpandDuringRestart() throws Exception { ); final String nodeB = internalCluster().startNode(); - assertBusy(() -> { - assertThat( - client().admin() - .indices() - .prepareGetSettings("myindex") - .setNames("index.number_of_replicas") - .get() - .getSetting("myindex", "index.number_of_replicas"), - equalTo("1") - ); - }); + assertBusy(() -> assertIndexSetting("myindex", "index.number_of_replicas", "1")); ensureGreen("myindex"); - // Mark the node for shutdown - assertAcked( - client().execute( - PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(primaryNodeId, SingleNodeShutdownMetadata.Type.RESTART, this.getTestName(), null, null) - ).get() - ); - - // RESTART did not reroute, neither should it when we no longer contract replicas, but we provoke it here in the test to ensure - // that auto-expansion has run. + putNodeShutdown(primaryNodeId, SingleNodeShutdownMetadata.Type.RESTART, null); + // registering node shutdown entry does not perform reroute, neither should it. + // we provoke it here in the test to ensure that auto-expansion has run. UpdateSettingsRequestBuilder settingsRequest = client().admin().indices().prepareUpdateSettings("myindex"); settingsRequest.setSettings(Settings.builder().put("index.routing.allocation.exclude.name", "non-existent")); assertAcked(settingsRequest.execute().actionGet()); - assertBusy(() -> { - assertThat( - client().admin() - .indices() - .prepareGetSettings("myindex") - .setNames("index.number_of_replicas") - .get() - .getSetting("myindex", "index.number_of_replicas"), - equalTo("1") - ); - }); - + assertBusy(() -> assertIndexSetting("myindex", "index.number_of_replicas", "1")); client().prepareIndex("myindex", "_doc").setSource("field", "value"); internalCluster().restartNode(primaryNode, new InternalTestCluster.RestartCallback() { @@ -552,6 +525,50 @@ public Settings onNodeStopped(String nodeName) throws Exception { ensureGreen("myindex"); } + public void testAutoExpandDuringReplace() throws Exception { + String node1 = internalCluster().startNode(); + String node2 = internalCluster().startNode(); + + createIndex( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, randomFrom("0-all", "0-1")) + .build() + ); + client().prepareIndex("myindex", "_doc").setSource("field", "value"); + + ensureGreen("index"); + assertIndexSetting("index", "index.number_of_replicas", "1"); + + String nodeNameToReplace = randomFrom(node1, node2); + String nodeIdToReplace = getNodeId(nodeNameToReplace); + String replacementNodeName = "node_t2"; + + putNodeShutdown(nodeIdToReplace, SingleNodeShutdownMetadata.Type.REPLACE, replacementNodeName); + // registering node shutdown entry does not perform reroute, neither should it. + // we provoke it here in the test to ensure that auto-expansion has run. + UpdateSettingsRequestBuilder settingsRequest = client().admin().indices().prepareUpdateSettings("index"); + settingsRequest.setSettings(Settings.builder().put("index.routing.allocation.exclude.name", "non-existent")); + assertAcked(settingsRequest.execute().actionGet()); + + ensureGreen("index"); + assertIndexSetting("index", "index.number_of_replicas", "1"); + + String nodeName3 = internalCluster().startNode(); + assertThat("started node name did not match registered replacement", nodeName3, equalTo(replacementNodeName)); + + ensureGreen("index"); + assertIndexSetting("index", "index.number_of_replicas", "1"); + + assertBusy(() -> assertNodeShutdownStatus(nodeIdToReplace, COMPLETE)); + internalCluster().stopNode(nodeNameToReplace); + + ensureGreen("index"); + assertIndexSetting("index", "index.number_of_replicas", "1"); + } + public void testNodeShutdownWithUnassignedShards() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeAId = getNodeId(nodeA); @@ -634,4 +651,9 @@ private String getNodeId(String nodeName) { .findFirst() .orElseThrow(() -> new AssertionError("requested node name [" + nodeName + "] not found")); } + + private void assertIndexSetting(String index, String setting, String expectedValue) { + GetSettingsResponse response = client().admin().indices().prepareGetSettings(index).get(); + assertThat(response.getSetting(index, setting), equalTo(expectedValue)); + } }