From 7254c736b13c5f7cba8278b7743bc5ed7e6d33b3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 7 Oct 2021 10:07:46 -0600 Subject: [PATCH] Add node REPLACE shutdown implementation (#76247) * WIP, basic implementation * Pull `if` branch into a variable * Remove outdated javadoc * Remove map iteration, use target name instead of id (whoops) * Remove streaming from isReplacementSource * Simplify getReplacementName * Only calculate node shutdowns if canRemain==false and forceMove==false * Move canRebalance comment in BalancedShardsAllocator * Rename canForceDuringVacate -> canForceAllocateDuringReplace * Add comment to AwarenessAllocationDecider.canForceAllocateDuringReplace * Revert changes to ClusterRebalanceAllocationDecider * Change "no replacement" decision message in NodeReplacementAllocationDecider * Only construct shutdown map once in isReplacementSource * Make node shutdowns and target shutdowns available within RoutingAllocation * Add randomization for adding the filter that is overridden in test * Add integration test with replicas: 1 * Go nuts with the verbosity of allocation decisions * Also check NODE_C in unit test * Test with randomly assigned shard * Fix test for extra verbose decision messages * Remove canAllocate(IndexMetadat, RoutingNode, RoutingAllocation) overriding * Spotless :| * Implement 100% disk usage check during force-replace-allocate * Add rudimentary documentation for "replace" shutdown type * Use RoutingAllocation shutdown map in BalancedShardsAllocator * Add canForceAllocateDuringReplace to AllocationDeciders & add test * Switch from percentage to bytes in DiskThresholdDecider force check * Enhance docs with note about rollover, creation, & shrink * Clarify decision messages, add test for target-only allocation * Simplify NodeReplacementAllocationDecider.replacementOngoing * Start nodeC before nodeB in integration test * Spotleeeessssssss! You get me every time! * Remove outdated comment # Conflicts: # x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java --- .../shutdown/apis/shutdown-put.asciidoc | 15 +- .../elasticsearch/cluster/ClusterModule.java | 4 +- .../metadata/SingleNodeShutdownMetadata.java | 3 +- .../routing/allocation/RoutingAllocation.java | 27 ++ .../allocator/BalancedShardsAllocator.java | 34 ++- .../allocation/decider/AllocationDecider.java | 18 ++ .../decider/AllocationDeciders.java | 19 ++ .../decider/AwarenessAllocationDecider.java | 8 + .../decider/DiskThresholdDecider.java | 27 ++ .../decider/EnableAllocationDecider.java | 4 +- .../decider/MaxRetryAllocationDecider.java | 5 + .../NodeReplacementAllocationDecider.java | 154 +++++++++++ .../decider/NodeVersionAllocationDecider.java | 5 + ...caAfterPrimaryActiveAllocationDecider.java | 5 + .../decider/ResizeAllocationDecider.java | 5 + .../RestoreInProgressAllocationDecider.java | 5 + .../decider/SameShardAllocationDecider.java | 5 + .../SnapshotInProgressAllocationDecider.java | 5 + .../decider/ThrottlingAllocationDecider.java | 5 + .../cluster/ClusterModuleTests.java | 2 + .../DiskThresholdDeciderUnitTests.java | 51 ++++ ...NodeReplacementAllocationDeciderTests.java | 237 ++++++++++++++++ .../cluster/ESAllocationTestCase.java | 4 + .../core/ilm/SetSingleNodeAllocateStep.java | 4 +- .../xpack/shutdown/NodeShutdownShardsIT.java | 259 +++++++++++++++++- ...TransportGetShutdownStatusActionTests.java | 61 +++-- 26 files changed, 929 insertions(+), 42 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java diff --git a/docs/reference/shutdown/apis/shutdown-put.asciidoc b/docs/reference/shutdown/apis/shutdown-put.asciidoc index e4a5a3af73233..6d6c09fe93826 100644 --- a/docs/reference/shutdown/apis/shutdown-put.asciidoc +++ b/docs/reference/shutdown/apis/shutdown-put.asciidoc @@ -26,7 +26,7 @@ Migrates ongoing tasks and index shards to other nodes as needed to prepare a node to be restarted or shut down and removed from the cluster. This ensures that {es} can be stopped safely with minimal disruption to the cluster. -You must specify the type of shutdown: `restart` or `remove`. +You must specify the type of shutdown: `restart`, `remove`, or `replace`. If a node is already being prepared for shutdown, you can use this API to change the shutdown type. @@ -58,12 +58,16 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms] `type`:: (Required, string) -Valid values are `restart` and `remove`. +Valid values are `restart`, `remove`, or `replace`. Use `restart` when you need to temporarily shut down a node to perform an upgrade, make configuration changes, or perform other maintenance. Because the node is expected to rejoin the cluster, data is not migrated off of the node. Use `remove` when you need to permanently remove a node from the cluster. The node is not marked ready for shutdown until data is migrated off of the node +Use `replace` to do a 1:1 replacement of a node with another node. Certain allocation decisions will +be ignored (such as disk watermarks) in the interest of true replacement of the source node with the +target node. During a replace-type shutdown, rollover and index creation may result in unassigned +shards, and shrink may fail until the replacement is complete. `reason`:: (Required, string) @@ -76,6 +80,13 @@ it does not affect the shut down process. Only valid if `type` is `restart`. Controls how long {es} will wait for the node to restart and join the cluster before reassigning its shards to other nodes. This works the same as <> with the `index.unassigned.node_left.delayed_timeout` setting. If you specify both a restart allocation delay and an index-level allocation delay, the longer of the two is used. +`target_node_name`:: +(Optional, string) +Only valid if `type` is `replace`. Specifies the name of the node that is replacing the node being +shut down. Shards from the shut down node are only allowed to be allocated to the target node, and +no other data will be allocated to the target node. During relocation of data certain allocation +rules are ignored, such as disk watermarks or user attribute filtering rules. + [[put-shutdown-api-example]] ==== {api-examples-title} diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 9c82b3dcfb0c6..5f64fcbf8c8cc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; @@ -49,7 +50,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; @@ -61,6 +61,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestMetadata; @@ -234,6 +235,7 @@ public static Collection createAllocationDeciders(Settings se addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new NodeShutdownAllocationDecider()); + addAllocationDecider(deciders, new NodeReplacementAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java index d2cf25f219303..ddd8e1bb2ad34 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java @@ -12,6 +12,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diffable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; @@ -114,7 +115,7 @@ private SingleNodeShutdownMetadata( if (targetNodeName != null && type != Type.REPLACE) { throw new IllegalArgumentException(new ParameterizedMessage("target node name is only valid for REPLACE type shutdowns, " + "but was given type [{}] and target node name [{}]", type, targetNodeName).getFormattedMessage()); - } else if (targetNodeName == null && type == Type.REPLACE) { + } else if (Strings.hasText(targetNodeName) == false && type == Type.REPLACE) { throw new IllegalArgumentException("target node name is required for REPLACE type shutdowns"); } this.targetNodeName = targetNodeName; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 6c03c6fc2a03d..8422edbe95fb7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -24,6 +25,7 @@ import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -72,6 +74,9 @@ public class RoutingAllocation { nodesChangedObserver, indexMetadataUpdater, restoreInProgressUpdater ); + private final Map nodeShutdowns; + private final Map nodeReplacementTargets; + /** * Creates a new {@link RoutingAllocation} @@ -91,6 +96,14 @@ public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, this.clusterInfo = clusterInfo; this.shardSizeInfo = shardSizeInfo; this.currentNanoTime = currentNanoTime; + this.nodeShutdowns = metadata.nodeShutdowns(); + Map targetNameToShutdown = new HashMap<>(); + for (SingleNodeShutdownMetadata shutdown : this.nodeShutdowns.values()) { + if (shutdown.getType() == SingleNodeShutdownMetadata.Type.REPLACE) { + targetNameToShutdown.put(shutdown.getTargetNodeName(), shutdown); + } + } + this.nodeReplacementTargets = Collections.unmodifiableMap(targetNameToShutdown); } /** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */ @@ -146,6 +159,20 @@ public SnapshotShardSizeInfo snapshotShardSizeInfo() { return shardSizeInfo; } + /** + * Returns the map of node id to shutdown metadata currently in the cluster + */ + public Map nodeShutdowns() { + return this.nodeShutdowns; + } + + /** + * Returns a map of target node name to replacement shutdown + */ + public Map replacementTargetShutdowns() { + return this.nodeReplacementTargets; + } + @SuppressWarnings("unchecked") public T custom(String key) { return (T) customs.get(key); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0b223d0e69c3d..83366156a147e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -14,6 +14,7 @@ import org.apache.lucene.util.IntroSorter; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; @@ -30,12 +31,12 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.PriorityComparator; import java.util.ArrayList; @@ -47,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -671,7 +673,6 @@ public MoveDecision decideMove(final ShardRouting shardRouting) { return MoveDecision.NOT_TAKEN; } - final boolean explain = allocation.debugDecision(); final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); assert sourceNode != null && sourceNode.containsShard(shardRouting); RoutingNode routingNode = sourceNode.getRoutingNode(); @@ -687,6 +688,21 @@ public MoveDecision decideMove(final ShardRouting shardRouting) { * This is not guaranteed to be balanced after this operation we still try best effort to * allocate on the minimal eligible node. */ + MoveDecision moveDecision = decideMove(shardRouting, sourceNode, canRemain, this::decideCanAllocate); + if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { + final SingleNodeShutdownMetadata shutdown = allocation.nodeShutdowns().get(shardRouting.currentNodeId()); + final boolean shardsOnReplacedNode = shutdown != null && + shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE); + if (shardsOnReplacedNode) { + return decideMove(shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate); + } + } + return moveDecision; + } + + private MoveDecision decideMove(ShardRouting shardRouting, ModelNode sourceNode, Decision remainDecision, + BiFunction decider) { + final boolean explain = allocation.debugDecision(); Type bestDecision = Type.NO; RoutingNode targetNode = null; final List nodeExplanationMap = explain ? new ArrayList<>() : null; @@ -694,8 +710,7 @@ public MoveDecision decideMove(final ShardRouting shardRouting) { for (ModelNode currentNode : sorter.modelNodes) { if (currentNode != sourceNode) { RoutingNode target = currentNode.getRoutingNode(); - // don't use canRebalance as we want hard filtering rules to apply. See #17698 - Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); + Decision allocationDecision = decider.apply(shardRouting, target); if (explain) { nodeExplanationMap.add(new NodeAllocationResult( currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking)); @@ -715,10 +730,19 @@ public MoveDecision decideMove(final ShardRouting shardRouting) { } } - return MoveDecision.cannotRemain(canRemain, AllocationDecision.fromDecisionType(bestDecision), + return MoveDecision.cannotRemain(remainDecision, AllocationDecision.fromDecisionType(bestDecision), targetNode != null ? targetNode.node() : null, nodeExplanationMap); } + private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { + // don't use canRebalance as we want hard filtering rules to apply. See #17698 + return allocation.deciders().canAllocate(shardRouting, target, allocation); + } + + private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, RoutingNode target) { + return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation); + } + /** * Builds the internal model from all shards in the given * {@link Iterable}. All shards in the {@link Iterable} must be assigned diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index af2a67b7d9b8d..5631fd2db78e0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -104,4 +104,22 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n return decision; } } + + /** + * Returns a {@link Decision} whether the given shard can be forced to the + * given node in the event that the shard's source node is being replaced. + * This allows nodes using a replace-type node shutdown to + * override certain deciders in the interest of moving the shard away from + * a node that *must* be removed. + * + * It defaults to returning "YES" and must be overridden by deciders that + * opt-out to having their other NO decisions *not* overridden while vacating. + * + * The caller is responsible for first checking: + * - that a replacement is ongoing + * - the shard routing's current node is the source of the replacement + */ + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return Decision.YES; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 7ea06bb119047..5f5f361ec5c19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -212,6 +212,25 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n return ret; } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider allocationDecider : allocations) { + Decision decision = allocationDecider.canForceAllocateDuringReplace(shardRouting, node, allocation); + // short track if a NO is returned. + if (decision.type() == Decision.Type.NO) { + if (allocation.debugDecision() == false) { + return Decision.NO; + } else { + ret.add(decision); + } + } else { + addDecision(ret, decision, allocation); + } + } + return ret; + } + private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) { // We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES). if (decision != Decision.ALWAYS diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index d59e215161bb7..41093ec8da0b7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -120,6 +120,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return underCapacity(shardRouting, node, allocation, true); } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + // We need to meet the criteria for shard awareness even during a replacement so that all + // copies of a shard do not get allocated to the same host/rack/AZ, so this explicitly + // checks the awareness 'canAllocate' to ensure we don't violate that constraint. + return canAllocate(shardRouting, node, allocation); + } + @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return underCapacity(shardRouting, node, allocation, false); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index ad99d471604e4..e8d0e56ffaebc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -323,6 +323,33 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing new ByteSizeValue(freeBytesAfterShard)); } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + ImmutableOpenMap usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages(); + final Decision decision = earlyTerminate(allocation, usages); + if (decision != null) { + return decision; + } + + if (allocation.metadata().index(shardRouting.index()).ignoreDiskWatermarks()) { + return YES_DISK_WATERMARKS_IGNORED; + } + + final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false); + final long shardSize = getExpectedShardSize(shardRouting, 0L, + allocation.clusterInfo(), allocation.snapshotShardSizeInfo(), allocation.metadata(), allocation.routingTable()); + assert shardSize >= 0 : shardSize; + final long freeBytesAfterShard = usage.getFreeBytes() - shardSize; + if (freeBytesAfterShard < 0) { + return Decision.single(Decision.Type.NO, NAME, + "unable to force allocate shard to [%s] during replacement, " + + "as allocating to this node would cause disk usage to exceed 100%% ([%s] bytes above available disk space)", + node.nodeId(), -freeBytesAfterShard); + } else { + return super.canForceAllocateDuringReplace(shardRouting, node, allocation); + } + } + private static final Decision YES_NOT_MOST_UTILIZED_DISK = Decision.single(Decision.Type.YES, NAME, "this shard is not allocated on the most utilized disk and can remain"); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java index 329775afaf0b7..455e26a1f6096 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java @@ -8,8 +8,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import java.util.Locale; - import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; @@ -20,6 +18,8 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import java.util.Locale; + /** * This allocation decider allows shard allocations / rebalancing via the cluster wide settings * {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java index ff76a1f3f1bdf..46c7b80f70536 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java @@ -73,4 +73,9 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n // if so, we don't want to force the primary allocation here return canAllocate(shardRouting, node, allocation); } + + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java new file mode 100644 index 0000000000000..623838bd64b81 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; + +import java.util.Map; +import java.util.Optional; + +public class NodeReplacementAllocationDecider extends AllocationDecider { + + public static final String NAME = "node_replacement"; + + static final Decision NO_REPLACEMENTS = Decision.single(Decision.Type.YES, NAME, + "neither the source nor target node are part of an ongoing node replacement (no replacements)"); + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (replacementOngoing(allocation) == false) { + return NO_REPLACEMENTS; + } else if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) { + return Decision.single(Decision.Type.YES, NAME, + "node [%s] is replacing node [%s], and may receive shards from it", shardRouting.currentNodeId(), node.nodeId()); + } else if (isReplacementSource(allocation, shardRouting.currentNodeId())) { + return Decision.single(Decision.Type.NO, NAME, + "node [%s] is being replaced, and its shards may only be allocated to the replacement target [%s]", + shardRouting.currentNodeId(), getReplacementName(allocation, shardRouting.currentNodeId())); + } else if (isReplacementSource(allocation, node.nodeId())) { + return Decision.single(Decision.Type.NO, NAME, + "node [%s] is being replaced by [%s], so no data may be allocated to it", + node.nodeId(), getReplacementName(allocation, node.nodeId()), shardRouting.currentNodeId()); + } else if (isReplacementTargetName(allocation, node.node().getName())) { + final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.node().getName()); + return Decision.single(Decision.Type.NO, NAME, + "node [%s] is replacing the vacating node [%s], only data currently allocated to the source node " + + "may be allocated to it until the replacement is complete", + node.nodeId(), shutdown == null ? null : shutdown.getNodeId(), shardRouting.currentNodeId()); + } else { + return Decision.single(Decision.Type.YES, NAME, + "neither the source nor target node are part of an ongoing node replacement"); + } + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (replacementOngoing(allocation) == false) { + return NO_REPLACEMENTS; + } else if (isReplacementSource(allocation, node.nodeId())) { + return Decision.single(Decision.Type.NO, NAME, + "node [%s] is being replaced by node [%s], so no data may remain on it", node.nodeId(), + getReplacementName(allocation, node.nodeId())); + } else { + return Decision.single(Decision.Type.YES, NAME, "node [%s] is not being replaced", node.nodeId()); + } + } + + @Override + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + if (replacementOngoing(allocation) == false) { + return NO_REPLACEMENTS; + } else if (isReplacementTargetName(allocation, node.getName())) { + final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.getName()); + return Decision.single(Decision.Type.NO, NAME, + "node [%s] is a node replacement target for node [%s], " + + "shards cannot auto expand to be on it until the replacement is complete", + node.getId(), shutdown == null ? null : shutdown.getNodeId()); + } else if (isReplacementSource(allocation, node.getId())) { + return Decision.single(Decision.Type.NO, NAME, + "node [%s] is being replaced by [%s], shards cannot auto expand to be on it", + node.getId(), getReplacementName(allocation, node.getId())); + } else { + return Decision.single(Decision.Type.YES, NAME, + "node is not part of a node replacement, so shards may be auto expanded onto it"); + } + } + + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) { + return Decision.single(Decision.Type.YES, NAME, + "node [%s] is being replaced by node [%s], and can be force vacated to the target", + shardRouting.currentNodeId(), node.nodeId()); + } else { + return Decision.single(Decision.Type.NO, NAME, + "shard is not on the source of a node replacement relocated to the replacement target"); + } + } + + /** + * Returns true if there are any node replacements ongoing in the cluster + */ + private static boolean replacementOngoing(RoutingAllocation allocation) { + return allocation.replacementTargetShutdowns().isEmpty() == false; + } + + /** + * Returns true if there is a replacement currently ongoing from the source to the target node id + */ + private static boolean replacementFromSourceToTarget(RoutingAllocation allocation, String sourceNodeId, String targetNodeName) { + if (replacementOngoing(allocation) == false) { + return false; + } + if (sourceNodeId == null || targetNodeName == null) { + return false; + } + final SingleNodeShutdownMetadata shutdown = allocation.nodeShutdowns().get(sourceNodeId); + return shutdown != null && + shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE) && + shutdown.getNodeId().equals(sourceNodeId) && + shutdown.getTargetNodeName().equals(targetNodeName); + } + + /** + * Returns true if the given node id is the source (the replaced node) of an ongoing node replacement + */ + private static boolean isReplacementSource(RoutingAllocation allocation, String nodeId) { + if (nodeId == null || replacementOngoing(allocation) == false) { + return false; + } + final Map nodeShutdowns = allocation.nodeShutdowns(); + return nodeShutdowns.containsKey(nodeId) && nodeShutdowns.get(nodeId).getType().equals(SingleNodeShutdownMetadata.Type.REPLACE); + } + + /** + * Returns true if the given node name (not the id!) is the target (the replacing node) of an ongoing node replacement + */ + private static boolean isReplacementTargetName(RoutingAllocation allocation, String nodeName) { + if (nodeName == null || replacementOngoing(allocation) == false) { + return false; + } + return allocation.replacementTargetShutdowns().get(nodeName) != null; + } + + private static String getReplacementName(RoutingAllocation allocation, String nodeIdBeingReplaced) { + if (nodeIdBeingReplaced == null || replacementOngoing(allocation) == false) { + return null; + } + return Optional.ofNullable(allocation.nodeShutdowns().get(nodeIdBeingReplaced)) + .filter(shutdown -> shutdown.getType().equals(SingleNodeShutdownMetadata.Type.REPLACE)) + .map(SingleNodeShutdownMetadata::getTargetNodeName) + .orElse(null); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index 9259361ee3c3b..f1fcb230de8f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -53,6 +53,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } + private Decision isVersionCompatibleRelocatePrimary(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, final RoutingAllocation allocation) { final RoutingNode source = routingNodes.node(sourceNodeId); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java index 4020f46ec0f8e..d395cd9020b1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java @@ -35,4 +35,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat } return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active"); } + + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ResizeAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ResizeAllocationDecider.java index 6be85f4df6fe5..accb55921b43d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ResizeAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ResizeAllocationDecider.java @@ -75,4 +75,9 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting; return canAllocate(shardRouting, node, allocation); } + + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java index 0ee7227cc9df5..3b61ca2a4a88b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java @@ -69,4 +69,9 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting; return canAllocate(shardRouting, node, allocation); } + + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java index 1f3f995deaf0d..83bcdcd30130c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java @@ -105,6 +105,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return YES_NONE_HOLD_COPY; } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } + private static Decision debugNoAlreadyAllocatedToHost(RoutingNode node, RoutingAllocation allocation, boolean checkNodeOnSameHostAddress) { String hostType = checkNodeOnSameHostAddress ? "address" : "name"; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index 9bb4b98985002..69b7f59911bfe 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -44,6 +44,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return canMove(shardRouting, allocation); } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } + private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { // Only primary shards are snapshotted diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 8c7bc6de7d6a1..f93494fde8d82 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -166,6 +166,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } } + @Override + public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, node, allocation); + } + /** * The shard routing passed to {@link #canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} is not the initializing shard to this * node but: diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index b722237d4a3a4..3d6a44bb7ae08 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; @@ -199,6 +200,7 @@ public void testAllocationDeciderOrder() { SnapshotInProgressAllocationDecider.class, RestoreInProgressAllocationDecider.class, NodeShutdownAllocationDecider.class, + NodeReplacementAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, DiskThresholdDecider.class, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 42244edb31643..034a289f97ec6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -524,4 +524,55 @@ public void testDecidesYesIfWatermarksIgnored() { assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index")); } + public void testCannotForceAllocateOver100PercentUsage() { + ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + final Index index = metadata.index("test").getIndex(); + + ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), true, EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + DiscoveryNode node_0 = new DiscoveryNode("node_0", buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT); + DiscoveryNode node_1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(node_0) + .add(node_1) + ).build(); + + // actual test -- after all that bloat :) + ImmutableOpenMap.Builder leastAvailableUsages = ImmutableOpenMap.builder(); + leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full + ImmutableOpenMap.Builder mostAvailableUsage = ImmutableOpenMap.builder(); + mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full + + ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); + // bigger than available space + final long shardSize = randomIntBetween(1, 10); + shardSizes.put("[test][0][p]", shardSize); + ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), + shardSizes.build(), null, ImmutableOpenMap.of(), ImmutableOpenMap.of()); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)), + clusterState.getRoutingNodes(), clusterState, clusterInfo, null, System.nanoTime()); + allocation.debugDecision(true); + Decision decision = decider.canForceAllocateDuringReplace(test_0, new RoutingNode("node_0", node_0), allocation); + assertEquals(Decision.Type.NO, decision.type()); + + assertThat(decision.getExplanation(), containsString( + "unable to force allocate shard to [node_0] during replacement, " + + "as allocating to this node would cause disk usage to exceed 100% ([" + shardSize + "] bytes above available disk space)")); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java new file mode 100644 index 0000000000000..b31e40e4dfaf6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java @@ -0,0 +1,237 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.EmptySnapshotsInfoService; +import org.elasticsearch.test.gateway.TestGatewayAllocator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase { + private static final DiscoveryNode NODE_A = newNode("node-a", "node-a", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + private static final DiscoveryNode NODE_B = newNode("node-b", "node-b", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + private static final DiscoveryNode NODE_C = newNode("node-c", "node-c", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + private final ShardRouting shard = ShardRouting.newUnassigned( + new ShardId("myindex", "myindex", 0), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created") + ); + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private NodeReplacementAllocationDecider decider = new NodeReplacementAllocationDecider(); + private final AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList( + decider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider(), + new NodeShutdownAllocationDecider() + ) + ); + private final AllocationService service = new AllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + + private final String idxName = "test-idx"; + private final String idxUuid = "test-idx-uuid"; + private final IndexMetadata indexMetadata = IndexMetadata.builder(idxName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, idxUuid) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .build(); + + public void testNoReplacements() { + ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder() + .add(NODE_A) + .add(NODE_B) + .add(NODE_C) + .build()) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + DiscoveryNode node = randomFrom(NODE_A, NODE_B, NODE_C); + RoutingNode routingNode = new RoutingNode(node.getId(), node, shard); + allocation.debugDecision(true); + + Decision decision = decider.canAllocate(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS.getExplanation()) + ); + + decision = decider.canRemain(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS.getExplanation()) + ); + } + + public void testCanForceAllocate() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), NODE_A.getId(), NODE_B.getName()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(NODE_A.getId(), NODE_A, shard); + allocation.debugDecision(true); + + ShardRouting assignedShard = ShardRouting.newUnassigned( + new ShardId("myindex", "myindex", 0), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created") + ); + assignedShard = assignedShard.initialize(NODE_A.getId(), null, 1); + assignedShard = assignedShard.moveToStarted(); + + Decision decision = decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("shard is not on the source of a node replacement relocated to the replacement target") + ); + + routingNode = new RoutingNode(NODE_B.getId(), NODE_B, assignedShard); + + decision = decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), + equalTo("node [" + NODE_A.getId() + "] is being replaced by node [" + NODE_B.getId() + + "], and can be force vacated to the target")); + + routingNode = new RoutingNode(NODE_C.getId(), NODE_C, assignedShard); + + decision = decider.canForceAllocateDuringReplace(assignedShard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat(decision.getExplanation(), + equalTo("shard is not on the source of a node replacement relocated to the replacement target")); + } + + public void testCannotRemainOnReplacedNode() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), NODE_A.getId(), NODE_B.getName()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(NODE_A.getId(), NODE_A, shard); + allocation.debugDecision(true); + + Decision decision = decider.canRemain(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("node [" + NODE_A.getId() + "] is being replaced by node [" + NODE_B.getId() + "], so no data may remain on it") + ); + + routingNode = new RoutingNode(NODE_B.getId(), NODE_B, shard); + + decision = decider.canRemain(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), equalTo("node [" + NODE_B.getId() + "] is not being replaced")); + + routingNode = new RoutingNode(NODE_C.getId(), NODE_C, shard); + + decision = decider.canRemain(shard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), equalTo("node [" + NODE_C.getId() + "] is not being replaced")); + } + + public void testCanAllocateToNeitherSourceNorTarget() { + ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"), NODE_A.getId(), NODE_B.getName()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + RoutingNode routingNode = new RoutingNode(NODE_A.getId(), NODE_A, shard); + allocation.debugDecision(true); + + ShardRouting testShard = this.shard; + if (randomBoolean()) { + testShard = shard.initialize(NODE_C.getId(), null, 1); + testShard = testShard.moveToStarted(); + } + Decision decision = decider.canAllocate(testShard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("node [" + NODE_A.getId() + "] is being replaced by [" + NODE_B.getName() + + "], so no data may be allocated to it") + ); + + routingNode = new RoutingNode(NODE_B.getId(), NODE_B, testShard); + + decision = decider.canAllocate(testShard, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + assertThat( + decision.getExplanation(), + equalTo("node [" + NODE_B.getId() + + "] is replacing the vacating node [" + NODE_A.getId() + "], only data currently allocated " + + "to the source node may be allocated to it until the replacement is complete") + ); + + routingNode = new RoutingNode(NODE_C.getId(), NODE_C, testShard); + + decision = decider.canAllocate(testShard, routingNode, allocation); + assertThat(decision.getExplanation(), decision.type(), equalTo(Decision.Type.YES)); + assertThat( + decision.getExplanation(), + containsString("neither the source nor target node are part of an ongoing node replacement") + ); + } + + private ClusterState prepareState(ClusterState initialState, String sourceNodeId, String targetNodeName) { + final SingleNodeShutdownMetadata nodeShutdownMetadata = SingleNodeShutdownMetadata.builder() + .setNodeId(sourceNodeId) + .setTargetNodeName(targetNodeName) + .setType(SingleNodeShutdownMetadata.Type.REPLACE) + .setReason(this.getTestName()) + .setStartedAtMillis(1L) + .build(); + NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(new HashMap<>()).putSingleNodeMetadata( + nodeShutdownMetadata + ); + return ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder() + .add(NODE_A) + .add(NODE_B) + .add(NODE_C) + .build()) + .metadata(Metadata.builder().put(IndexMetadata.builder(indexMetadata)) + .putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata)) + .build(); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 97d6b4e80bf60..2dd0afa7a490a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -129,6 +129,10 @@ protected static DiscoveryNode newNode(String nodeId, Set rol return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); } + protected static DiscoveryNode newNode(String nodeName, String nodeId, Set roles) { + return new DiscoveryNode(nodeName, nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + } + protected static DiscoveryNode newNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), MASTER_DATA_ROLES, version); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java index e5a16c4e1dd25..7ddcf1d1a2b5d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -82,7 +83,8 @@ public void performAction(IndexMetadata indexMetadata, ClusterState clusterState new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new DataTierAllocationDecider(clusterState.getMetadata().settings(), new ClusterSettings(Settings.EMPTY, ALL_CLUSTER_SETTINGS)), new NodeVersionAllocationDecider(), - new NodeShutdownAllocationDecider() + new NodeShutdownAllocationDecider(), + new NodeReplacementAllocationDecider() )); final RoutingNodes routingNodes = clusterState.getRoutingNodes(); RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, null, diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java index 948f63dddb358..68fe45a4b0ad1 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Build; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -20,6 +21,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -28,8 +30,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Status.COMPLETE; +import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Status.STALLED; import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0) @@ -172,7 +176,6 @@ public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() throws Exception { ); AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); assertTrue(putShutdownResponse.isAcknowledged()); - assertBusy(() -> { GetShutdownStatusAction.Response getResp = client().execute( GetShutdownStatusAction.INSTANCE, @@ -183,6 +186,258 @@ public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() throws Exception { }); } + public void testNodeReplacementOnlyAllowsShardsFromReplacedNode() throws Exception { + String nodeA = internalCluster().startNode(Settings.builder().put("node.name", "node-a")); + Settings.Builder nodeASettings = Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 1); + createIndex("myindex", nodeASettings.build()); + final String nodeAId = getNodeId(nodeA); + final String nodeB = "node_t1"; // TODO: fix this to so it's actually overrideable + + // Mark the nodeA as being replaced + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeAId, + SingleNodeShutdownMetadata.Type.REPLACE, + this.getTestName(), + null, + nodeB + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + GetShutdownStatusAction.Response getResp = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + + assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(STALLED)); + + internalCluster().startNode(Settings.builder().put("node.name", nodeB)); + final String nodeBId = getNodeId(nodeB); + + logger.info("--> NodeA: {} -- {}", nodeA, nodeAId); + logger.info("--> NodeB: {} -- {}", nodeB, nodeBId); + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState(); + int active = 0; + for (ShardRouting sr : state.routingTable().allShards("myindex")) { + if (sr.active()) { + active++; + assertThat( + "expected shard on nodeB (" + nodeBId + ") but it was on a different node", + sr.currentNodeId(), + equalTo(nodeBId) + ); + } + } + assertThat("expected all 3 of the primary shards to be allocated", active, equalTo(3)); + }); + + assertBusy(() -> { + GetShutdownStatusAction.Response shutdownStatus = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE)); + }); + + final String nodeC = internalCluster().startNode(); + + createIndex("other", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + + ensureYellow("other"); + + // Explain the replica for the "other" index + ClusterAllocationExplainResponse explainResponse = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("other") + .setShard(0) + .setPrimary(false) + .get(); + + // Validate that the replica cannot be allocated to nodeB because it's the target of a node replacement + explainResponse.getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getNodeDecisions() + .stream() + .filter(nodeDecision -> nodeDecision.getNode().getId().equals(nodeBId)) + .findFirst() + .ifPresent(nodeAllocationResult -> { + assertThat(nodeAllocationResult.getCanAllocateDecision().type(), equalTo(Decision.Type.NO)); + assertTrue( + "expected decisions to mention node replacement: " + + nodeAllocationResult.getCanAllocateDecision() + .getDecisions() + .stream() + .map(Decision::getExplanation) + .collect(Collectors.joining(",")), + nodeAllocationResult.getCanAllocateDecision() + .getDecisions() + .stream() + .anyMatch( + decision -> decision.getExplanation().contains("is replacing the vacating node") + && decision.getExplanation().contains("may be allocated to it until the replacement is complete") + ) + ); + }); + } + + public void testNodeReplacementOverridesFilters() throws Exception { + String nodeA = internalCluster().startNode(Settings.builder().put("node.name", "node-a")); + // Create an index and pin it to nodeA, when we replace it with nodeB, + // it'll move the data, overridding the `_name` allocation filter + Settings.Builder nodeASettings = Settings.builder() + .put("index.routing.allocation.require._name", nodeA) + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 0); + createIndex("myindex", nodeASettings.build()); + final String nodeAId = getNodeId(nodeA); + final String nodeB = "node_t2"; // TODO: fix this to so it's actually overrideable + + // Mark the nodeA as being replaced + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeAId, + SingleNodeShutdownMetadata.Type.REPLACE, + this.getTestName(), + null, + nodeB + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + GetShutdownStatusAction.Response getResp = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + + assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(STALLED)); + + final String nodeC = internalCluster().startNode(); + internalCluster().startNode(Settings.builder().put("node.name", nodeB)); + final String nodeBId = getNodeId(nodeB); + + logger.info("--> NodeA: {} -- {}", nodeA, nodeAId); + logger.info("--> NodeB: {} -- {}", nodeB, nodeBId); + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState(); + for (ShardRouting sr : state.routingTable().allShards("myindex")) { + assertThat( + "expected shard on nodeB (" + nodeBId + ") but it was on a different node", + sr.currentNodeId(), + equalTo(nodeBId) + ); + } + }); + + assertBusy(() -> { + GetShutdownStatusAction.Response shutdownStatus = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE)); + }); + + createIndex("other", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + + ensureYellow("other"); + + // Explain the replica for the "other" index + ClusterAllocationExplainResponse explainResponse = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("other") + .setShard(0) + .setPrimary(false) + .get(); + + // Validate that the replica cannot be allocated to nodeB because it's the target of a node replacement + explainResponse.getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getNodeDecisions() + .stream() + .filter(nodeDecision -> nodeDecision.getNode().getId().equals(nodeBId)) + .findFirst() + .ifPresent(nodeAllocationResult -> { + assertThat(nodeAllocationResult.getCanAllocateDecision().type(), equalTo(Decision.Type.NO)); + assertTrue( + "expected decisions to mention node replacement: " + + nodeAllocationResult.getCanAllocateDecision() + .getDecisions() + .stream() + .map(Decision::getExplanation) + .collect(Collectors.joining(",")), + nodeAllocationResult.getCanAllocateDecision() + .getDecisions() + .stream() + .anyMatch( + decision -> decision.getExplanation().contains("is replacing the vacating node") + && decision.getExplanation().contains("may be allocated to it until the replacement is complete") + ) + ); + }); + } + + public void testNodeReplacementOnlyToTarget() throws Exception { + String nodeA = internalCluster().startNode( + Settings.builder().put("node.name", "node-a").put("cluster.routing.rebalance.enable", "none") + ); + Settings.Builder nodeASettings = Settings.builder().put("index.number_of_shards", 4).put("index.number_of_replicas", 0); + createIndex("myindex", nodeASettings.build()); + final String nodeAId = getNodeId(nodeA); + final String nodeB = "node_t1"; // TODO: fix this to so it's actually overrideable + final String nodeC = "node_t2"; // TODO: fix this to so it's actually overrideable + + // Mark the nodeA as being replaced + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeAId, + SingleNodeShutdownMetadata.Type.REPLACE, + this.getTestName(), + null, + nodeB + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + GetShutdownStatusAction.Response getResp = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + + assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(STALLED)); + + internalCluster().startNode(Settings.builder().put("node.name", nodeB)); + internalCluster().startNode(Settings.builder().put("node.name", nodeC)); + final String nodeBId = getNodeId(nodeB); + final String nodeCId = getNodeId(nodeC); + + logger.info("--> NodeA: {} -- {}", nodeA, nodeAId); + logger.info("--> NodeB: {} -- {}", nodeB, nodeBId); + logger.info("--> NodeC: {} -- {}", nodeC, nodeCId); + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState(); + for (ShardRouting sr : state.routingTable().allShards("myindex")) { + assertThat( + "expected all shards for index to be on node B (" + nodeBId + ") but " + sr.toString() + " is on " + sr.currentNodeId(), + sr.currentNodeId(), + equalTo(nodeBId) + ); + } + }); + + assertBusy(() -> { + GetShutdownStatusAction.Response shutdownStatus = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeAId) + ).get(); + assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE)); + }); + } + private void indexRandomData() throws Exception { int numDocs = scaledRandomIntBetween(100, 1000); IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; @@ -211,7 +466,7 @@ private String findIdOfNodeWithPrimaryShard(String indexName) { ); } - private String getNodeId(String nodeName) throws Exception { + private String getNodeId(String nodeName) { NodesInfoResponse nodes = client().admin().cluster().prepareNodesInfo().clear().get(); return nodes.getNodes() .stream() diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java index 085c462f84dc3..9aeedaac074ed 100644 --- a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -79,35 +80,39 @@ private void setup() { clusterInfoService = EmptyClusterInfoService.INSTANCE; allocationDeciders = new AllocationDeciders( - org.elasticsearch.core.List.of(new NodeShutdownAllocationDecider(), new AllocationDecider() { - @Override - public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return canAllocate.get().test(shardRouting, node, allocation); + org.elasticsearch.core.List.of( + new NodeShutdownAllocationDecider(), + new NodeReplacementAllocationDecider(), + new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate.get().test(shardRouting, node, allocation); + } + + @Override + public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + // No behavior should change based on rebalance decisions + return Decision.NO; + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canRemain.get().test(shardRouting, node, allocation); + } + + @Override + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + // No behavior relevant to these tests should change based on auto expansion decisions + throw new UnsupportedOperationException(); + } + + @Override + public Decision canRebalance(RoutingAllocation allocation) { + // No behavior should change based on rebalance decisions + return Decision.NO; + } } - - @Override - public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { - // No behavior should change based on rebalance decisions - return Decision.NO; - } - - @Override - public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return canRemain.get().test(shardRouting, node, allocation); - } - - @Override - public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { - // No behavior relevant to these tests should change based on auto expansion decisions - throw new UnsupportedOperationException(); - } - - @Override - public Decision canRebalance(RoutingAllocation allocation) { - // No behavior should change based on rebalance decisions - return Decision.NO; - } - }) + ) ); snapshotsInfoService = () -> new SnapshotShardSizeInfo( new ImmutableOpenMap.Builder().build()