diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 1b3f7d126b14f..6d51e518cc967 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; @@ -38,6 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; import java.util.ArrayList; import java.util.HashSet; @@ -49,6 +51,8 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * Listens for a node to go over the high watermark and kicks off an empty @@ -314,10 +318,29 @@ public void onNewInfo(ClusterInfo info) { logger.trace("no reroute required"); listener.onResponse(null); } - final Set indicesToAutoRelease = state.routingTable().indicesRouting().stream() - .map(Map.Entry::getKey) + + // Generate a map of node name to ID so we can use it to look up node replacement targets + final Map nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false) + .collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2)); + + // Calculate both the source node id and the target node id of a "replace" type shutdown + final Set nodesIdsPartOfReplacement = state.metadata().nodeShutdowns().values().stream() + .filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE) + .flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName()))) + .collect(Collectors.toSet()); + + // Generate a set of all the indices that exist on either the target or source of a node replacement + final Set indicesOnReplaceSourceOrTarget = nodesIdsPartOfReplacement.stream() + .flatMap(nodeId -> state.getRoutingNodes().node(nodeId).copyShards().stream() + .map(ShardRouting::index) + .map(Index::getName)) + .collect(Collectors.toSet()); + + final Set indicesToAutoRelease = state.routingTable().indicesRouting().keySet().stream() .filter(index -> indicesNotToAutoRelease.contains(index) == false) .filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) + // Do not auto release indices that are on either the source or the target of a node replacement + .filter(index -> indicesOnReplaceSourceOrTarget.contains(index) == false) .collect(Collectors.toSet()); if (indicesToAutoRelease.isEmpty() == false) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index e0dedd9c1b986..b726867e063c3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -21,6 +21,8 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -411,6 +413,154 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener assertNull(indicesToRelease.get()); } + public void testNoAutoReleaseOfIndicesOnReplacementNodes() { + AtomicReference> indicesToMarkReadOnly = new AtomicReference<>(); + AtomicReference> indicesToRelease = new AtomicReference<>(); + AtomicReference currentClusterState = new AtomicReference<>(); + AllocationService allocation = createAllocationService(Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .put(IndexMetadata.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index("test_1")) + .addAsNew(metadata.index("test_2")) + .build(); + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata).routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNormalNode("node1", "my-node1")) + .add(newNormalNode("node2", "my-node2"))).build(), allocation); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8)); + + final ImmutableOpenMap.Builder reservedSpacesBuilder + = ImmutableOpenMap.builder(); + final int reservedSpaceNode1 = between(0, 10); + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build()); + final int reservedSpaceNode2 = between(0, 10); + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build()); + ImmutableOpenMap reservedSpaces = reservedSpacesBuilder.build(); + + currentClusterState.set(clusterState); + + final DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, currentClusterState::get, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L, + (reason, priority, listener) -> { + assertNotNull(listener); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(currentClusterState.get()); + }) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + if (readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + } else { + assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate)); + } + listener.onResponse(null); + } + }; + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // Reserved space is ignored when applying block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // Change cluster state so that "test_2" index is blocked (read only) + IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder() + .put(clusterState.metadata() + .index("test_2").getSettings()) + .put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build(); + + final String sourceNode; + final String targetNode; + if (randomBoolean()) { + sourceNode = "node1"; + targetNode = "my-node2"; + } else { + sourceNode = "node2"; + targetNode = "my-node1"; + } + + final ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()) + .put(indexMetadata, true) + .putCustom(NodesShutdownMetadata.TYPE, + new NodesShutdownMetadata(Collections.singletonMap(sourceNode, + SingleNodeShutdownMetadata.builder() + .setNodeId(sourceNode) + .setReason("testing") + .setType(SingleNodeShutdownMetadata.Type.REPLACE) + .setTargetNodeName(targetNode) + .setStartedAtMillis(randomNonNegativeLong()) + .build()))) + .build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build()) + .build(); + + assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); + + currentClusterState.set(clusterStateWithBlocks); + + // When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertThat(indicesToMarkReadOnly.get(), contains("test_1")); + assertNull(indicesToRelease.get()); + + // While the REPLACE is ongoing the lock will not be removed from the index + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + final ClusterState clusterStateNoShutdown = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()) + .put(indexMetadata, true) + .removeCustom(NodesShutdownMetadata.TYPE) + .build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build()) + .build(); + + assertTrue(clusterStateNoShutdown.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); + + currentClusterState.set(clusterStateNoShutdown); + + // Now that the REPLACE is gone, auto-releasing can occur for the index + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertThat(indicesToRelease.get(), contains("test_2")); + } + @TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging") public void testDiskMonitorLogging() throws IllegalAccessException { final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) @@ -657,14 +807,18 @@ private static DiscoveryNode newFrozenOnlyNode(String nodeId) { Sets.union(org.elasticsearch.core.Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE), irrelevantRoles)); } - private static DiscoveryNode newNormalNode(String nodeId) { + private static DiscoveryNode newNormalNode(String nodeId, String nodeName) { Set randomRoles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)); Set roles = Sets.union(randomRoles, org.elasticsearch.core.Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, DiscoveryNodeRole.DATA_WARM_NODE_ROLE, DiscoveryNodeRole.DATA_COLD_NODE_ROLE))); - return newNode(nodeId, roles); + return newNode(nodeName, nodeId, roles); + } + + private static DiscoveryNode newNormalNode(String nodeId) { + return newNormalNode(nodeId, ""); } // java 11 forward compatibility