Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node REPLACE shutdown implementation #76247

Merged
merged 36 commits into from
Oct 7, 2021
Merged
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d1b8218
WIP, basic implementation
dakrone Jul 28, 2021
fd0c597
Pull `if` branch into a variable
dakrone Oct 4, 2021
41c6238
Remove outdated javadoc
dakrone Oct 4, 2021
a96e596
Remove map iteration, use target name instead of id (whoops)
dakrone Oct 4, 2021
238112a
Remove streaming from isReplacementSource
dakrone Oct 4, 2021
2402803
Simplify getReplacementName
dakrone Oct 4, 2021
ff30a2a
Only calculate node shutdowns if canRemain==false and forceMove==false
dakrone Oct 5, 2021
47500c7
Move canRebalance comment in BalancedShardsAllocator
dakrone Oct 5, 2021
9ded9f3
Rename canForceDuringVacate -> canForceAllocateDuringReplace
dakrone Oct 5, 2021
f3613bc
Add comment to AwarenessAllocationDecider.canForceAllocateDuringReplace
dakrone Oct 5, 2021
7b4c518
Revert changes to ClusterRebalanceAllocationDecider
dakrone Oct 5, 2021
4ce00ef
Change "no replacement" decision message in NodeReplacementAllocation…
dakrone Oct 5, 2021
27ebeeb
Only construct shutdown map once in isReplacementSource
dakrone Oct 5, 2021
cb1c0c4
Make node shutdowns and target shutdowns available within RoutingAllo…
dakrone Oct 5, 2021
c2571ee
Add randomization for adding the filter that is overridden in test
dakrone Oct 5, 2021
fe1889c
Add integration test with replicas: 1
dakrone Oct 5, 2021
4f0da9c
Go nuts with the verbosity of allocation decisions
dakrone Oct 5, 2021
d659b11
Also check NODE_C in unit test
dakrone Oct 5, 2021
448968b
Test with randomly assigned shard
dakrone Oct 5, 2021
e83b630
Fix test for extra verbose decision messages
dakrone Oct 5, 2021
df91ec1
Remove canAllocate(IndexMetadat, RoutingNode, RoutingAllocation) over…
dakrone Oct 5, 2021
5724f2a
Merge remote-tracking branch 'origin/master' into node-replacement-de…
dakrone Oct 5, 2021
c3f6c23
Spotless :|
dakrone Oct 5, 2021
bc4c78b
Implement 100% disk usage check during force-replace-allocate
dakrone Oct 5, 2021
21e9f9e
Add rudimentary documentation for "replace" shutdown type
dakrone Oct 5, 2021
f7c407f
Use RoutingAllocation shutdown map in BalancedShardsAllocator
dakrone Oct 6, 2021
06320e3
Add canForceAllocateDuringReplace to AllocationDeciders & add test
dakrone Oct 6, 2021
4ab4496
Switch from percentage to bytes in DiskThresholdDecider force check
dakrone Oct 6, 2021
e8773b7
Enhance docs with note about rollover, creation, & shrink
dakrone Oct 6, 2021
658a721
Clarify decision messages, add test for target-only allocation
dakrone Oct 6, 2021
d4c8650
Simplify NodeReplacementAllocationDecider.replacementOngoing
dakrone Oct 6, 2021
fa92698
Start nodeC before nodeB in integration test
dakrone Oct 6, 2021
c27ea49
Merge remote-tracking branch 'origin/master' into node-replacement-de…
dakrone Oct 6, 2021
c237f95
Spotleeeessssssss! You get me every time!
dakrone Oct 6, 2021
f692e63
Remove outdated comment
dakrone Oct 7, 2021
f31c949
Merge remote-tracking branch 'origin/master' into node-replacement-de…
dakrone Oct 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/reference/shutdown/apis/shutdown-put.asciidoc
Original file line number Diff line number Diff line change
@@ -66,7 +66,8 @@ Use `remove` when you need to permanently remove a node from the cluster.
The node is not marked ready for shutdown until data is migrated off of the node
Use `replace` to do a 1:1 replacement of a node with another node. Certain allocation decisions will
be ignored (such as disk watermarks) in the interest of true replacement of the source node with the
target node.
target node. During a replace-type shutdown, rollover and index creation may result in unassigned
shards, and shrink may fail until the replacement is complete.

`reason`::
(Required, string)
Original file line number Diff line number Diff line change
@@ -31,12 +31,12 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator;

import java.util.ArrayList;
@@ -690,11 +690,10 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
*/
MoveDecision moveDecision = decideMove(shardRouting, sourceNode, canRemain, this::decideCanAllocate);
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) {
final boolean shardsOnReplacedNodes = allocation.metadata().nodeShutdowns().values().stream()
.filter(s -> s.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
.map(SingleNodeShutdownMetadata::getNodeId)
.anyMatch(shardRouting.currentNodeId()::equals);
if (shardsOnReplacedNodes) {
final SingleNodeShutdownMetadata shutdown = allocation.nodeShutdowns().get(shardRouting.currentNodeId());
final boolean shardsOnReplacedNode = shutdown != null &&
shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE);
if (shardsOnReplacedNode) {
return decideMove(shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate);
}
}
Original file line number Diff line number Diff line change
@@ -212,6 +212,25 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return ret;
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canForceAllocateDuringReplace(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision.type() == Decision.Type.NO) {
if (allocation.debugDecision() == false) {
return Decision.NO;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}

private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
if (decision != Decision.ALWAYS
Original file line number Diff line number Diff line change
@@ -333,12 +333,12 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.snapshotShardSizeInfo(), allocation.metadata(), allocation.routingTable());
assert shardSize >= 0 : shardSize;
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
if (freeSpaceAfterShard < 0) {
final long freeBytesAfterShard = usage.getFreeBytes() - shardSize;
if (freeBytesAfterShard < 0) {
return Decision.single(Decision.Type.NO, NAME,
"unable to force allocate shard to [%s] during replacement, " +
"as allocating to this node would cause disk usage to exceed 100%% ([%s] free)",
node.nodeId(), Strings.format1Decimals(freeSpaceAfterShard, "%"));
"as allocating to this node would cause disk usage to exceed 100%% ([%s] bytes above available disk space)",
node.nodeId(), -freeBytesAfterShard);
} else {
return super.canForceAllocateDuringReplace(shardRouting, node, allocation);
}
Original file line number Diff line number Diff line change
@@ -38,12 +38,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
shardRouting.currentNodeId(), getReplacementName(allocation, shardRouting.currentNodeId()));
} else if (isReplacementSource(allocation, node.nodeId())) {
return Decision.single(Decision.Type.NO, NAME,
"node [%s] is being replaced by [%s], so no data from other node [%s] may be allocated to it",
"node [%s] is being replaced by [%s], so no data may be allocated to it",
node.nodeId(), getReplacementName(allocation, node.nodeId()), shardRouting.currentNodeId());
} else if (isReplacementTargetName(allocation, node.node().getName())) {
final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.node().getName());
return Decision.single(Decision.Type.NO, NAME,
"node [%s] is replacing the vacating node [%s], so no data from other node [%s] " +
"node [%s] is replacing the vacating node [%s], only data currently allocated to the source node " +
"may be allocated to it until the replacement is complete",
node.nodeId(), shutdown == null ? null : shutdown.getNodeId(), shardRouting.currentNodeId());
} else {
@@ -101,8 +101,7 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
* Returns true if there are any node replacements ongoing in the cluster
*/
private static boolean replacementOngoing(RoutingAllocation allocation) {
return allocation.nodeShutdowns().values().stream()
.anyMatch(shutdown -> shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE));
return allocation.replacementTargetShutdowns().isEmpty() == false;
}

/**
Original file line number Diff line number Diff line change
@@ -573,6 +573,6 @@ public void testCannotForceAllocateOver100PercentUsage() {

assertThat(decision.getExplanation(), containsString(
"unable to force allocate shard to [node_0] during replacement, " +
"as allocating to this node would cause disk usage to exceed 100% ([-" + shardSize + "%] free)"));
"as allocating to this node would cause disk usage to exceed 100% ([" + shardSize + "] bytes above available disk space)"));
}
}
Original file line number Diff line number Diff line change
@@ -189,7 +189,7 @@ public void testCanAllocateToNeitherSourceNorTarget() {
assertThat(
decision.getExplanation(),
equalTo("node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() +
"], so no data from other node [" + testShard.currentNodeId() + "] may be allocated to it")
"], so no data may be allocated to it")
);

routingNode = new RoutingNode(NODE_B.getId(), NODE_B, testShard);
@@ -199,8 +199,8 @@ public void testCanAllocateToNeitherSourceNorTarget() {
assertThat(
decision.getExplanation(),
equalTo("node [" + NODE_B.getId() +
"] is replacing the vacating node [" + NODE_A.getId() + "], so no data from other node [" +
testShard.currentNodeId() + "] may be allocated to it until the replacement is complete")
"] is replacing the vacating node [" + NODE_A.getId() + "], only data currently allocated " +
"to the source node may be allocated to it until the replacement is complete")
);

routingNode = new RoutingNode(NODE_C.getId(), NODE_C, testShard);
Original file line number Diff line number Diff line change
@@ -294,7 +294,7 @@ public void testNodeReplacementOverridesFilters() throws Exception {
.put("index.number_of_replicas", 0);
createIndex("myindex", nodeASettings.build());
final String nodeAId = getNodeId(nodeA);
final String nodeB = "node_t1"; // TODO: fix this to so it's actually overrideable
final String nodeB = "node_t2"; // TODO: fix this to so it's actually overrideable

// Mark the nodeA as being replaced
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
@@ -314,6 +314,7 @@ public void testNodeReplacementOverridesFilters() throws Exception {

assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(STALLED));

final String nodeC = internalCluster().startNode();
internalCluster().startNode(Settings.builder().put("node.name", nodeB));
final String nodeBId = getNodeId(nodeB);

@@ -339,8 +340,6 @@ public void testNodeReplacementOverridesFilters() throws Exception {
assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
});

final String nodeC = internalCluster().startNode();

createIndex("other", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());

ensureYellow("other");
@@ -382,6 +381,65 @@ public void testNodeReplacementOverridesFilters() throws Exception {
}, () -> fail("expected a 'NO' decision for nodeB but there was no explanation for that node"));
}

public void testNodeReplacementOnlyToTarget() throws Exception {
String nodeA = internalCluster().startNode(
Settings.builder().put("node.name", "node-a").put("cluster.routing.rebalance.enable", "none")
);
// Create an index and pin it to nodeA, when we replace it with nodeB,
// it'll move the data, overridding the `_name` allocation filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update this comment, looks like a copy-paste error, since we do not pin the index here?

Settings.Builder nodeASettings = Settings.builder().put("index.number_of_shards", 4).put("index.number_of_replicas", 0);
createIndex("myindex", nodeASettings.build());
final String nodeAId = getNodeId(nodeA);
final String nodeB = "node_t1"; // TODO: fix this to so it's actually overrideable
final String nodeC = "node_t2"; // TODO: fix this to so it's actually overrideable

// Mark the nodeA as being replaced
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
nodeAId,
SingleNodeShutdownMetadata.Type.REPLACE,
this.getTestName(),
null,
nodeB
);
AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get();
assertTrue(putShutdownResponse.isAcknowledged());

GetShutdownStatusAction.Response getResp = client().execute(
GetShutdownStatusAction.INSTANCE,
new GetShutdownStatusAction.Request(nodeAId)
).get();

assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(STALLED));

internalCluster().startNode(Settings.builder().put("node.name", nodeB));
internalCluster().startNode(Settings.builder().put("node.name", nodeC));
Comment on lines +412 to +413
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can swap the order of starting nodes? Suppose we allowed allocating to both nodes (a bug). But we then risk the shards moving over to B (they only have to start the relocation) before node C have started?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have another test that starts nodeC prior to nodeB for the replacement (it was one of the changes you asked for), which I think satisfies this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did notice that and thanks for doing that. I was only adding this comment here because this test is named testNodeReplacementOnlyToTarget and it seems wrong to then not have another node that shards could be allocated to. But I agree that the case is covered so you can consider this comment optional for sure.

final String nodeBId = getNodeId(nodeB);
final String nodeCId = getNodeId(nodeC);

logger.info("--> NodeA: {} -- {}", nodeA, nodeAId);
logger.info("--> NodeB: {} -- {}", nodeB, nodeBId);
logger.info("--> NodeC: {} -- {}", nodeC, nodeCId);

assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState();
for (ShardRouting sr : state.routingTable().allShards("myindex")) {
assertThat(
"expected all shards for index to be on node B (" + nodeBId + ") but " + sr.toString() + " is on " + sr.currentNodeId(),
sr.currentNodeId(),
equalTo(nodeBId)
);
}
});

assertBusy(() -> {
GetShutdownStatusAction.Response shutdownStatus = client().execute(
GetShutdownStatusAction.INSTANCE,
new GetShutdownStatusAction.Request(nodeAId)
).get();
assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
});
}

private void indexRandomData() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];