Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node REPLACE shutdown implementation #76247

Merged
merged 36 commits into from
Oct 7, 2021
Merged
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d1b8218
WIP, basic implementation
dakrone Jul 28, 2021
fd0c597
Pull `if` branch into a variable
dakrone Oct 4, 2021
41c6238
Remove outdated javadoc
dakrone Oct 4, 2021
a96e596
Remove map iteration, use target name instead of id (whoops)
dakrone Oct 4, 2021
238112a
Remove streaming from isReplacementSource
dakrone Oct 4, 2021
2402803
Simplify getReplacementName
dakrone Oct 4, 2021
ff30a2a
Only calculate node shutdowns if canRemain==false and forceMove==false
dakrone Oct 5, 2021
47500c7
Move canRebalance comment in BalancedShardsAllocator
dakrone Oct 5, 2021
9ded9f3
Rename canForceDuringVacate -> canForceAllocateDuringReplace
dakrone Oct 5, 2021
f3613bc
Add comment to AwarenessAllocationDecider.canForceAllocateDuringReplace
dakrone Oct 5, 2021
7b4c518
Revert changes to ClusterRebalanceAllocationDecider
dakrone Oct 5, 2021
4ce00ef
Change "no replacement" decision message in NodeReplacementAllocation…
dakrone Oct 5, 2021
27ebeeb
Only construct shutdown map once in isReplacementSource
dakrone Oct 5, 2021
cb1c0c4
Make node shutdowns and target shutdowns available within RoutingAllo…
dakrone Oct 5, 2021
c2571ee
Add randomization for adding the filter that is overridden in test
dakrone Oct 5, 2021
fe1889c
Add integration test with replicas: 1
dakrone Oct 5, 2021
4f0da9c
Go nuts with the verbosity of allocation decisions
dakrone Oct 5, 2021
d659b11
Also check NODE_C in unit test
dakrone Oct 5, 2021
448968b
Test with randomly assigned shard
dakrone Oct 5, 2021
e83b630
Fix test for extra verbose decision messages
dakrone Oct 5, 2021
df91ec1
Remove canAllocate(IndexMetadat, RoutingNode, RoutingAllocation) over…
dakrone Oct 5, 2021
5724f2a
Merge remote-tracking branch 'origin/master' into node-replacement-de…
dakrone Oct 5, 2021
c3f6c23
Spotless :|
dakrone Oct 5, 2021
bc4c78b
Implement 100% disk usage check during force-replace-allocate
dakrone Oct 5, 2021
21e9f9e
Add rudimentary documentation for "replace" shutdown type
dakrone Oct 5, 2021
f7c407f
Use RoutingAllocation shutdown map in BalancedShardsAllocator
dakrone Oct 6, 2021
06320e3
Add canForceAllocateDuringReplace to AllocationDeciders & add test
dakrone Oct 6, 2021
4ab4496
Switch from percentage to bytes in DiskThresholdDecider force check
dakrone Oct 6, 2021
e8773b7
Enhance docs with note about rollover, creation, & shrink
dakrone Oct 6, 2021
658a721
Clarify decision messages, add test for target-only allocation
dakrone Oct 6, 2021
d4c8650
Simplify NodeReplacementAllocationDecider.replacementOngoing
dakrone Oct 6, 2021
fa92698
Start nodeC before nodeB in integration test
dakrone Oct 6, 2021
c27ea49
Merge remote-tracking branch 'origin/master' into node-replacement-de…
dakrone Oct 6, 2021
c237f95
Spotleeeessssssss! You get me every time!
dakrone Oct 6, 2021
f692e63
Remove outdated comment
dakrone Oct 7, 2021
f31c949
Merge remote-tracking branch 'origin/master' into node-replacement-de…
dakrone Oct 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions docs/reference/shutdown/apis/shutdown-put.asciidoc
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ Migrates ongoing tasks and index shards to other nodes as needed
to prepare a node to be restarted or shut down and removed from the cluster.
This ensures that {es} can be stopped safely with minimal disruption to the cluster.

You must specify the type of shutdown: `restart` or `remove`.
You must specify the type of shutdown: `restart`, `remove`, or `replace`.
If a node is already being prepared for shutdown,
you can use this API to change the shutdown type.

@@ -58,12 +58,16 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]

`type`::
(Required, string)
Valid values are `restart` and `remove`.
Valid values are `restart`, `remove`, or `replace`.
Use `restart` when you need to temporarily shut down a node to perform an upgrade,
make configuration changes, or perform other maintenance.
Because the node is expected to rejoin the cluster, data is not migrated off of the node.
Use `remove` when you need to permanently remove a node from the cluster.
The node is not marked ready for shutdown until data is migrated off of the node
Use `replace` to do a 1:1 replacement of a node with another node. Certain allocation decisions will
be ignored (such as disk watermarks) in the interest of true replacement of the source node with the
target node. During a replace-type shutdown, rollover and index creation may result in unassigned
shards, and shrink may fail until the replacement is complete.

`reason`::
(Required, string)
@@ -76,6 +80,13 @@ it does not affect the shut down process.
Only valid if `type` is `restart`. Controls how long {es} will wait for the node to restart and join the cluster before reassigning its shards to other nodes. This works the same as
<<delayed-allocation,delaying allocation>> with the `index.unassigned.node_left.delayed_timeout` setting. If you specify both a restart allocation delay and an index-level allocation delay, the longer of the two is used.

`target_node_name`::
(Optional, string)
Only valid if `type` is `replace`. Specifies the name of the node that is replacing the node being
shut down. Shards from the shut down node are only allowed to be allocated to the target node, and
no other data will be allocated to the target node. During relocation of data certain allocation
rules are ignored, such as disk watermarks or user attribute filtering rules.

[[put-shutdown-api-example]]
==== {api-examples-title}

Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
@@ -49,7 +50,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
@@ -60,6 +60,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestMetadata;
@@ -202,6 +203,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -114,7 +115,7 @@ private SingleNodeShutdownMetadata(
if (targetNodeName != null && type != Type.REPLACE) {
throw new IllegalArgumentException(new ParameterizedMessage("target node name is only valid for REPLACE type shutdowns, " +
"but was given type [{}] and target node name [{}]", type, targetNodeName).getFormattedMessage());
} else if (targetNodeName == null && type == Type.REPLACE) {
} else if (Strings.hasText(targetNodeName) == false && type == Type.REPLACE) {
throw new IllegalArgumentException("target node name is required for REPLACE type shutdowns");
}
this.targetNodeName = targetNodeName;
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNodes;
@@ -24,6 +25,7 @@
import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -71,6 +73,9 @@ public class RoutingAllocation {
nodesChangedObserver, indexMetadataUpdater, restoreInProgressUpdater
);

private final Map<String, SingleNodeShutdownMetadata> nodeShutdowns;
private final Map<String, SingleNodeShutdownMetadata> nodeReplacementTargets;


/**
* Creates a new {@link RoutingAllocation}
@@ -90,6 +95,14 @@ public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes,
this.clusterInfo = clusterInfo;
this.shardSizeInfo = shardSizeInfo;
this.currentNanoTime = currentNanoTime;
this.nodeShutdowns = metadata.nodeShutdowns();
Map<String, SingleNodeShutdownMetadata> targetNameToShutdown = new HashMap<>();
for (SingleNodeShutdownMetadata shutdown : this.nodeShutdowns.values()) {
if (shutdown.getType() == SingleNodeShutdownMetadata.Type.REPLACE) {
targetNameToShutdown.put(shutdown.getTargetNodeName(), shutdown);
}
}
this.nodeReplacementTargets = Collections.unmodifiableMap(targetNameToShutdown);
}

/** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */
@@ -145,6 +158,20 @@ public SnapshotShardSizeInfo snapshotShardSizeInfo() {
return shardSizeInfo;
}

/**
* Returns the map of node id to shutdown metadata currently in the cluster
*/
public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
return this.nodeShutdowns;
}

/**
* Returns a map of target node name to replacement shutdown
*/
public Map<String, SingleNodeShutdownMetadata> replacementTargetShutdowns() {
return this.nodeReplacementTargets;
}

@SuppressWarnings("unchecked")
public <T extends ClusterState.Custom> T custom(String key) {
return (T) customs.get(key);
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -30,12 +31,12 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator;

import java.util.ArrayList;
@@ -47,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@@ -671,7 +673,6 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
return MoveDecision.NOT_TAKEN;
}

final boolean explain = allocation.debugDecision();
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
RoutingNode routingNode = sourceNode.getRoutingNode();
@@ -687,15 +688,29 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
MoveDecision moveDecision = decideMove(shardRouting, sourceNode, canRemain, this::decideCanAllocate);
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) {
final SingleNodeShutdownMetadata shutdown = allocation.nodeShutdowns().get(shardRouting.currentNodeId());
final boolean shardsOnReplacedNode = shutdown != null &&
shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE);
if (shardsOnReplacedNode) {
return decideMove(shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate);
}
}
return moveDecision;
}

private MoveDecision decideMove(ShardRouting shardRouting, ModelNode sourceNode, Decision remainDecision,
BiFunction<ShardRouting, RoutingNode, Decision> decider) {
final boolean explain = allocation.debugDecision();
Type bestDecision = Type.NO;
RoutingNode targetNode = null;
final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
int weightRanking = 0;
for (ModelNode currentNode : sorter.modelNodes) {
if (currentNode != sourceNode) {
RoutingNode target = currentNode.getRoutingNode();
// don't use canRebalance as we want hard filtering rules to apply. See #17698
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision allocationDecision = decider.apply(shardRouting, target);
if (explain) {
nodeExplanationMap.add(new NodeAllocationResult(
currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking));
@@ -715,10 +730,19 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
}
}

return MoveDecision.cannotRemain(canRemain, AllocationDecision.fromDecisionType(bestDecision),
return MoveDecision.cannotRemain(remainDecision, AllocationDecision.fromDecisionType(bestDecision),
targetNode != null ? targetNode.node() : null, nodeExplanationMap);
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
// don't use canRebalance as we want hard filtering rules to apply. See #17698
return allocation.deciders().canAllocate(shardRouting, target, allocation);
}

private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, RoutingNode target) {
return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation);
}

/**
* Builds the internal model from all shards in the given
* {@link Iterable}. All shards in the {@link Iterable} must be assigned
Original file line number Diff line number Diff line change
@@ -104,4 +104,22 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return decision;
}
}

/**
* Returns a {@link Decision} whether the given shard can be forced to the
* given node in the event that the shard's source node is being replaced.
* This allows nodes using a replace-type node shutdown to
* override certain deciders in the interest of moving the shard away from
* a node that *must* be removed.
*
* It defaults to returning "YES" and must be overridden by deciders that
* opt-out to having their other NO decisions *not* overridden while vacating.
*
* The caller is responsible for first checking:
* - that a replacement is ongoing
* - the shard routing's current node is the source of the replacement
*/
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
dakrone marked this conversation as resolved.
Show resolved Hide resolved
return Decision.YES;
}
}
Original file line number Diff line number Diff line change
@@ -212,6 +212,25 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return ret;
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canForceAllocateDuringReplace(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision.type() == Decision.Type.NO) {
if (allocation.debugDecision() == false) {
return Decision.NO;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}

private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
if (decision != Decision.ALWAYS
Original file line number Diff line number Diff line change
@@ -120,6 +120,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return underCapacity(shardRouting, node, allocation, true);
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// We need to meet the criteria for shard awareness even during a replacement so that all
// copies of a shard do not get allocated to the same host/rack/AZ, so this explicitly
// checks the awareness 'canAllocate' to ensure we don't violate that constraint.
return canAllocate(shardRouting, node, allocation);
dakrone marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false);
Original file line number Diff line number Diff line change
@@ -317,6 +317,33 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
new ByteSizeValue(freeBytesAfterShard));
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ImmutableOpenMap<String, DiskUsage> usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
return decision;
}

if (allocation.metadata().index(shardRouting.index()).ignoreDiskWatermarks()) {
return YES_DISK_WATERMARKS_IGNORED;
}

final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.snapshotShardSizeInfo(), allocation.metadata(), allocation.routingTable());
assert shardSize >= 0 : shardSize;
final long freeBytesAfterShard = usage.getFreeBytes() - shardSize;
if (freeBytesAfterShard < 0) {
return Decision.single(Decision.Type.NO, NAME,
"unable to force allocate shard to [%s] during replacement, " +
"as allocating to this node would cause disk usage to exceed 100%% ([%s] bytes above available disk space)",
node.nodeId(), -freeBytesAfterShard);
} else {
return super.canForceAllocateDuringReplace(shardRouting, node, allocation);
}
}

private static final Decision YES_NOT_MOST_UTILIZED_DISK = Decision.single(Decision.Type.YES, NAME,
"this shard is not allocated on the most utilized disk and can remain");

Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import java.util.Locale;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
@@ -20,6 +18,8 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import java.util.Locale;

/**
* This allocation decider allows shard allocations / rebalancing via the cluster wide settings
* {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting
Original file line number Diff line number Diff line change
@@ -73,4 +73,9 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
// if so, we don't want to force the primary allocation here
return canAllocate(shardRouting, node, allocation);
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocate(shardRouting, node, allocation);
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading