Skip to content

Commit

Permalink
Add node REPLACE shutdown implementation (#76247)
Browse files Browse the repository at this point in the history
* WIP, basic implementation

* Pull `if` branch into a variable

* Remove outdated javadoc

* Remove map iteration, use target name instead of id (whoops)

* Remove streaming from isReplacementSource

* Simplify getReplacementName

* Only calculate node shutdowns if canRemain==false and forceMove==false

* Move canRebalance comment in BalancedShardsAllocator

* Rename canForceDuringVacate -> canForceAllocateDuringReplace

* Add comment to AwarenessAllocationDecider.canForceAllocateDuringReplace

* Revert changes to ClusterRebalanceAllocationDecider

* Change "no replacement" decision message in NodeReplacementAllocationDecider

* Only construct shutdown map once in isReplacementSource

* Make node shutdowns and target shutdowns available within RoutingAllocation

* Add randomization for adding the filter that is overridden in test

* Add integration test with replicas: 1

* Go nuts with the verbosity of allocation decisions

* Also check NODE_C in unit test

* Test with randomly assigned shard

* Fix test for extra verbose decision messages

* Remove canAllocate(IndexMetadat, RoutingNode, RoutingAllocation) overriding

* Spotless :|

* Implement 100% disk usage check during force-replace-allocate

* Add rudimentary documentation for "replace" shutdown type

* Use RoutingAllocation shutdown map in BalancedShardsAllocator

* Add canForceAllocateDuringReplace to AllocationDeciders & add test

* Switch from percentage to bytes in DiskThresholdDecider force check

* Enhance docs with note about rollover, creation, & shrink

* Clarify decision messages, add test for target-only allocation

* Simplify NodeReplacementAllocationDecider.replacementOngoing

* Start nodeC before nodeB in integration test

* Spotleeeessssssss! You get me every time!

* Remove outdated comment
  • Loading branch information
dakrone authored Oct 7, 2021
1 parent f16a699 commit 6e875d0
Show file tree
Hide file tree
Showing 26 changed files with 928 additions and 43 deletions.
15 changes: 13 additions & 2 deletions docs/reference/shutdown/apis/shutdown-put.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Migrates ongoing tasks and index shards to other nodes as needed
to prepare a node to be restarted or shut down and removed from the cluster.
This ensures that {es} can be stopped safely with minimal disruption to the cluster.

You must specify the type of shutdown: `restart` or `remove`.
You must specify the type of shutdown: `restart`, `remove`, or `replace`.
If a node is already being prepared for shutdown,
you can use this API to change the shutdown type.

Expand Down Expand Up @@ -58,12 +58,16 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]

`type`::
(Required, string)
Valid values are `restart` and `remove`.
Valid values are `restart`, `remove`, or `replace`.
Use `restart` when you need to temporarily shut down a node to perform an upgrade,
make configuration changes, or perform other maintenance.
Because the node is expected to rejoin the cluster, data is not migrated off of the node.
Use `remove` when you need to permanently remove a node from the cluster.
The node is not marked ready for shutdown until data is migrated off of the node
Use `replace` to do a 1:1 replacement of a node with another node. Certain allocation decisions will
be ignored (such as disk watermarks) in the interest of true replacement of the source node with the
target node. During a replace-type shutdown, rollover and index creation may result in unassigned
shards, and shrink may fail until the replacement is complete.

`reason`::
(Required, string)
Expand All @@ -76,6 +80,13 @@ it does not affect the shut down process.
Only valid if `type` is `restart`. Controls how long {es} will wait for the node to restart and join the cluster before reassigning its shards to other nodes. This works the same as
<<delayed-allocation,delaying allocation>> with the `index.unassigned.node_left.delayed_timeout` setting. If you specify both a restart allocation delay and an index-level allocation delay, the longer of the two is used.

`target_node_name`::
(Optional, string)
Only valid if `type` is `replace`. Specifies the name of the node that is replacing the node being
shut down. Shards from the shut down node are only allowed to be allocated to the target node, and
no other data will be allocated to the target node. During relocation of data certain allocation
rules are ignored, such as disk watermarks or user attribute filtering rules.

[[put-shutdown-api-example]]
==== {api-examples-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
Expand All @@ -49,7 +50,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
Expand All @@ -60,6 +60,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestMetadata;
Expand Down Expand Up @@ -202,6 +203,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
Expand Down Expand Up @@ -114,7 +115,7 @@ private SingleNodeShutdownMetadata(
if (targetNodeName != null && type != Type.REPLACE) {
throw new IllegalArgumentException(new ParameterizedMessage("target node name is only valid for REPLACE type shutdowns, " +
"but was given type [{}] and target node name [{}]", type, targetNodeName).getFormattedMessage());
} else if (targetNodeName == null && type == Type.REPLACE) {
} else if (Strings.hasText(targetNodeName) == false && type == Type.REPLACE) {
throw new IllegalArgumentException("target node name is required for REPLACE type shutdowns");
}
this.targetNodeName = targetNodeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNodes;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -71,6 +73,9 @@ public class RoutingAllocation {
nodesChangedObserver, indexMetadataUpdater, restoreInProgressUpdater
);

private final Map<String, SingleNodeShutdownMetadata> nodeShutdowns;
private final Map<String, SingleNodeShutdownMetadata> nodeReplacementTargets;


/**
* Creates a new {@link RoutingAllocation}
Expand All @@ -90,6 +95,14 @@ public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes,
this.clusterInfo = clusterInfo;
this.shardSizeInfo = shardSizeInfo;
this.currentNanoTime = currentNanoTime;
this.nodeShutdowns = metadata.nodeShutdowns();
Map<String, SingleNodeShutdownMetadata> targetNameToShutdown = new HashMap<>();
for (SingleNodeShutdownMetadata shutdown : this.nodeShutdowns.values()) {
if (shutdown.getType() == SingleNodeShutdownMetadata.Type.REPLACE) {
targetNameToShutdown.put(shutdown.getTargetNodeName(), shutdown);
}
}
this.nodeReplacementTargets = Collections.unmodifiableMap(targetNameToShutdown);
}

/** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */
Expand Down Expand Up @@ -145,6 +158,20 @@ public SnapshotShardSizeInfo snapshotShardSizeInfo() {
return shardSizeInfo;
}

/**
* Returns the map of node id to shutdown metadata currently in the cluster
*/
public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
return this.nodeShutdowns;
}

/**
* Returns a map of target node name to replacement shutdown
*/
public Map<String, SingleNodeShutdownMetadata> replacementTargetShutdowns() {
return this.nodeReplacementTargets;
}

@SuppressWarnings("unchecked")
public <T extends ClusterState.Custom> T custom(String key) {
return (T) customs.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -30,12 +31,12 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator;

import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -671,7 +673,6 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
return MoveDecision.NOT_TAKEN;
}

final boolean explain = allocation.debugDecision();
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
RoutingNode routingNode = sourceNode.getRoutingNode();
Expand All @@ -687,15 +688,29 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
MoveDecision moveDecision = decideMove(shardRouting, sourceNode, canRemain, this::decideCanAllocate);
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) {
final SingleNodeShutdownMetadata shutdown = allocation.nodeShutdowns().get(shardRouting.currentNodeId());
final boolean shardsOnReplacedNode = shutdown != null &&
shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE);
if (shardsOnReplacedNode) {
return decideMove(shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate);
}
}
return moveDecision;
}

private MoveDecision decideMove(ShardRouting shardRouting, ModelNode sourceNode, Decision remainDecision,
BiFunction<ShardRouting, RoutingNode, Decision> decider) {
final boolean explain = allocation.debugDecision();
Type bestDecision = Type.NO;
RoutingNode targetNode = null;
final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
int weightRanking = 0;
for (ModelNode currentNode : sorter.modelNodes) {
if (currentNode != sourceNode) {
RoutingNode target = currentNode.getRoutingNode();
// don't use canRebalance as we want hard filtering rules to apply. See #17698
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision allocationDecision = decider.apply(shardRouting, target);
if (explain) {
nodeExplanationMap.add(new NodeAllocationResult(
currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking));
Expand All @@ -715,10 +730,19 @@ public MoveDecision decideMove(final ShardRouting shardRouting) {
}
}

return MoveDecision.cannotRemain(canRemain, AllocationDecision.fromDecisionType(bestDecision),
return MoveDecision.cannotRemain(remainDecision, AllocationDecision.fromDecisionType(bestDecision),
targetNode != null ? targetNode.node() : null, nodeExplanationMap);
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
// don't use canRebalance as we want hard filtering rules to apply. See #17698
return allocation.deciders().canAllocate(shardRouting, target, allocation);
}

private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, RoutingNode target) {
return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation);
}

/**
* Builds the internal model from all shards in the given
* {@link Iterable}. All shards in the {@link Iterable} must be assigned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,22 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return decision;
}
}

/**
* Returns a {@link Decision} whether the given shard can be forced to the
* given node in the event that the shard's source node is being replaced.
* This allows nodes using a replace-type node shutdown to
* override certain deciders in the interest of moving the shard away from
* a node that *must* be removed.
*
* It defaults to returning "YES" and must be overridden by deciders that
* opt-out to having their other NO decisions *not* overridden while vacating.
*
* The caller is responsible for first checking:
* - that a replacement is ongoing
* - the shard routing's current node is the source of the replacement
*/
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,25 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return ret;
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canForceAllocateDuringReplace(shardRouting, node, allocation);
// short track if a NO is returned.
if (decision.type() == Decision.Type.NO) {
if (allocation.debugDecision() == false) {
return Decision.NO;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}

private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
if (decision != Decision.ALWAYS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return underCapacity(shardRouting, node, allocation, true);
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// We need to meet the criteria for shard awareness even during a replacement so that all
// copies of a shard do not get allocated to the same host/rack/AZ, so this explicitly
// checks the awareness 'canAllocate' to ensure we don't violate that constraint.
return canAllocate(shardRouting, node, allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,33 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
new ByteSizeValue(freeBytesAfterShard));
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ImmutableOpenMap<String, DiskUsage> usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
return decision;
}

if (allocation.metadata().index(shardRouting.index()).ignoreDiskWatermarks()) {
return YES_DISK_WATERMARKS_IGNORED;
}

final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.snapshotShardSizeInfo(), allocation.metadata(), allocation.routingTable());
assert shardSize >= 0 : shardSize;
final long freeBytesAfterShard = usage.getFreeBytes() - shardSize;
if (freeBytesAfterShard < 0) {
return Decision.single(Decision.Type.NO, NAME,
"unable to force allocate shard to [%s] during replacement, " +
"as allocating to this node would cause disk usage to exceed 100%% ([%s] bytes above available disk space)",
node.nodeId(), -freeBytesAfterShard);
} else {
return super.canForceAllocateDuringReplace(shardRouting, node, allocation);
}
}

private static final Decision YES_NOT_MOST_UTILIZED_DISK = Decision.single(Decision.Type.YES, NAME,
"this shard is not allocated on the most utilized disk and can remain");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import java.util.Locale;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand All @@ -20,6 +18,8 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import java.util.Locale;

/**
* This allocation decider allows shard allocations / rebalancing via the cluster wide settings
* {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
// if so, we don't want to force the primary allocation here
return canAllocate(shardRouting, node, allocation);
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocate(shardRouting, node, allocation);
}
}
Loading

0 comments on commit 6e875d0

Please sign in to comment.