diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 5631fd2db78e0..3b8a5ef1bf81c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -122,4 +122,19 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.YES; } + + /** + * Returns a {@link Decision} whether the given replica shard can be + * allocated to the given node when there is an existing retention lease + * already existing on the node (meaning it has been allocated there previously) + * + * This method does not actually check whether there is a retention lease, + * that is the responsibility of the caller. + * + * It defaults to the same value as {@code canAllocate}. + */ + public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, + RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 5f5f361ec5c19..406a9a4c99216 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -231,6 +231,28 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing return ret; } + @Override + public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) { + return Decision.NO; + } + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider allocationDecider : allocations) { + Decision decision = allocationDecider.canAllocateReplicaWhenThereIsRetentionLease(shardRouting, node, allocation); + // short track if a NO is returned. + if (decision.type() == Decision.Type.NO) { + if (allocation.debugDecision() == false) { + return Decision.NO; + } else { + ret.add(decision); + } + } else { + addDecision(ret, decision, allocation); + } + } + return ret; + } + private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) { // We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES). if (decision != Decision.ALWAYS diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java index 623838bd64b81..c83488a9eb1a1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java @@ -97,6 +97,17 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing } } + @Override + public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (isReplacementTargetName(allocation, node.node().getName())) { + return Decision.single(Decision.Type.YES, NAME, + "node [%s] is a node replacement target and can have a previously allocated replica re-allocated to it", + node.nodeId()); + } else { + return canAllocate(shardRouting, node, allocation); + } + } + /** * Returns true if there are any node replacements ongoing in the cluster */ diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 6eb1e25078731..9f99725a53917 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -188,7 +188,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas } else if (matchingNodes.getNodeWithHighestMatch() != null) { RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId()); // we only check on THROTTLE since we checked before on NO - Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation); + Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(unassignedShard, + nodeWithHighestMatch, allocation); if (decision.type() == Decision.Type.THROTTLE) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node()); @@ -245,7 +246,7 @@ public static Tuple> canBeAllocatedT } // if we can't allocate it on a node, ignore it, for example, this handles // cases for only allocating a replica after a primary - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation); if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) { if (explain) { madeDecision = decision; @@ -317,10 +318,17 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al continue; } - // check if we can allocate on that node... - // we only check for NO, since if this node is THROTTLING and it has enough "same data" - // then we will try and assign it next time - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + // Check whether we have existing data for the replica + final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(discoNode); + final Decision decision; + if (retainingSeqNoForReplica == -1) { + // There is no existing replica data on the node + decision = allocation.deciders().canAllocate(shard, node, allocation); + } else { + // There is existing replica data on the node + decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation); + } + MatchingNode matchingNode = null; if (explain) { matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata); @@ -328,6 +336,8 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision)); } + // we only check for NO, since if this node is THROTTLING and it has enough "same data" + // then we will try and assign it next time if (decision.type() == Decision.Type.NO) { continue; } diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java index 7ee34255134dc..1b9e5e47033ba 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java @@ -439,6 +439,48 @@ public void testNodeReplacementOnlyToTarget() throws Exception { }); } + public void testReallocationForReplicaDuringNodeReplace() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeAId = getNodeId(nodeA); + createIndex("myindex", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + ensureYellow("myindex"); + + // Start a second node, so the replica will be on nodeB + final String nodeB = internalCluster().startNode(); + ensureGreen("myindex"); + + final String nodeC = internalCluster().startNode(); + + // Register a replace for nodeA, with nodeC as the target + PutShutdownNodeAction.Request shutdownRequest = new PutShutdownNodeAction.Request( + nodeAId, + SingleNodeShutdownMetadata.Type.REPLACE, + "testing", + null, + nodeC + ); + client().execute(PutShutdownNodeAction.INSTANCE, shutdownRequest).get(); + + // Wait for the node replace shutdown to be complete + assertBusy(() -> { + GetShutdownStatusAction.Response shutdownStatus = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE)); + }); + + // Remove nodeA from the cluster (it's been terminated) + internalCluster().stopNode(nodeA); + + // Restart nodeC, the replica on nodeB will be flipped to primary and + // when nodeC comes back up, it should have the replica assigned to it + internalCluster().restartNode(nodeC); + + // All shards for the index should be allocated + ensureGreen("myindex"); + } + private void indexRandomData() throws Exception { int numDocs = scaledRandomIntBetween(100, 1000); IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];