Skip to content

Commit

Permalink
Allow re-allocation of replica shards on nodes during shutdown replac…
Browse files Browse the repository at this point in the history
…ement (elastic#79171)

This commit allows replica shards that have existing data on disk to be re-allocated to the target
of a "REPLACE" type node shutdown. Prior to this if the target node of a shutdown were to restart,
the replicas would not be allowed to be allocated even if their data existed on disk.

Relates to elastic#70338 as a follow-up to elastic#76247
  • Loading branch information
dakrone committed Oct 15, 2021
1 parent d1bb15a commit 3ba8d86
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,19 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
}

/**
* Returns a {@link Decision} whether the given replica shard can be
* allocated to the given node when there is an existing retention lease
* already existing on the node (meaning it has been allocated there previously)
*
* This method does not actually check whether there is a retention lease,
* that is the responsibility of the caller.
*
* It defaults to the same value as {@code canAllocate}.
*/
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node,
RoutingAllocation allocation) {
return canAllocate(shardRouting, node, allocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,28 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
return ret;
}

@Override
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocateReplicaWhenThereIsRetentionLease(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision.type() == Decision.Type.NO) {
if (allocation.debugDecision() == false) {
return Decision.NO;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}

private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
if (decision != Decision.ALWAYS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
}
}

@Override
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (isReplacementTargetName(allocation, node.node().getName())) {
return Decision.single(Decision.Type.YES, NAME,
"node [%s] is a node replacement target and can have a previously allocated replica re-allocated to it",
node.nodeId());
} else {
return canAllocate(shardRouting, node, allocation);
}
}

/**
* Returns true if there are any node replacements ongoing in the cluster
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
} else if (matchingNodes.getNodeWithHighestMatch() != null) {
RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId());
// we only check on THROTTLE since we checked before on NO
Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation);
Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(unassignedShard,
nodeWithHighestMatch, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store",
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
Expand Down Expand Up @@ -245,7 +246,7 @@ public static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedT
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) {
if (explain) {
madeDecision = decision;
Expand Down Expand Up @@ -317,17 +318,26 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
continue;
}

// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
// Check whether we have existing data for the replica
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(discoNode);
final Decision decision;
if (retainingSeqNoForReplica == -1) {
// There is no existing replica data on the node
decision = allocation.deciders().canAllocate(shard, node, allocation);
} else {
// There is existing replica data on the node
decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
}

MatchingNode matchingNode = null;
if (explain) {
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata);
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes);
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
}

// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (decision.type() == Decision.Type.NO) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,48 @@ public void testNodeReplacementOnlyToTarget() throws Exception {
});
}

public void testReallocationForReplicaDuringNodeReplace() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeAId = getNodeId(nodeA);
createIndex("myindex", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
ensureYellow("myindex");

// Start a second node, so the replica will be on nodeB
final String nodeB = internalCluster().startNode();
ensureGreen("myindex");

final String nodeC = internalCluster().startNode();

// Register a replace for nodeA, with nodeC as the target
PutShutdownNodeAction.Request shutdownRequest = new PutShutdownNodeAction.Request(
nodeAId,
SingleNodeShutdownMetadata.Type.REPLACE,
"testing",
null,
nodeC
);
client().execute(PutShutdownNodeAction.INSTANCE, shutdownRequest).get();

// Wait for the node replace shutdown to be complete
assertBusy(() -> {
GetShutdownStatusAction.Response shutdownStatus = client().execute(
GetShutdownStatusAction.INSTANCE,
new GetShutdownStatusAction.Request(nodeAId)
).get();
assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
});

// Remove nodeA from the cluster (it's been terminated)
internalCluster().stopNode(nodeA);

// Restart nodeC, the replica on nodeB will be flipped to primary and
// when nodeC comes back up, it should have the replica assigned to it
internalCluster().restartNode(nodeC);

// All shards for the index should be allocated
ensureGreen("myindex");
}

private void indexRandomData() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
Expand Down

0 comments on commit 3ba8d86

Please sign in to comment.