Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Do not remove flood block from indices on nodes undergoing replacement (#78942) #79008

Merged
merged 1 commit into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -49,6 +51,8 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Listens for a node to go over the high watermark and kicks off an empty
Expand Down Expand Up @@ -314,10 +318,29 @@ public void onNewInfo(ClusterInfo info) {
logger.trace("no reroute required");
listener.onResponse(null);
}
final Set<String> indicesToAutoRelease = state.routingTable().indicesRouting().stream()
.map(Map.Entry::getKey)

// Generate a map of node name to ID so we can use it to look up node replacement targets
final Map<String, String> nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false)
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));

// Calculate both the source node id and the target node id of a "replace" type shutdown
final Set<String> nodesIdsPartOfReplacement = state.metadata().nodeShutdowns().values().stream()
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
.collect(Collectors.toSet());

// Generate a set of all the indices that exist on either the target or source of a node replacement
final Set<String> indicesOnReplaceSourceOrTarget = nodesIdsPartOfReplacement.stream()
.flatMap(nodeId -> state.getRoutingNodes().node(nodeId).copyShards().stream()
.map(ShardRouting::index)
.map(Index::getName))
.collect(Collectors.toSet());

final Set<String> indicesToAutoRelease = state.routingTable().indicesRouting().keySet().stream()
.filter(index -> indicesNotToAutoRelease.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
// Do not auto release indices that are on either the source or the target of a node replacement
.filter(index -> indicesOnReplaceSourceOrTarget.contains(index) == false)
.collect(Collectors.toSet());

if (indicesToAutoRelease.isEmpty() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -411,6 +413,154 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
assertNull(indicesToRelease.get());
}

public void testNoAutoReleaseOfIndicesOnReplacementNodes() {
AtomicReference<Set<String>> indicesToMarkReadOnly = new AtomicReference<>();
AtomicReference<Set<String>> indicesToRelease = new AtomicReference<>();
AtomicReference<ClusterState> currentClusterState = new AtomicReference<>();
AllocationService allocation = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
.put(IndexMetadata.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metadata.index("test_1"))
.addAsNew(metadata.index("test_2"))
.build();
final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata).routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNormalNode("node1", "my-node1"))
.add(newNormalNode("node2", "my-node2"))).build(), allocation);
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));

final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpacesBuilder
= ImmutableOpenMap.builder();
final int reservedSpaceNode1 = between(0, 10);
reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"),
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build());
final int reservedSpaceNode2 = between(0, 10);
reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"),
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build());
ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpaces = reservedSpacesBuilder.build();

currentClusterState.set(clusterState);

final DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, currentClusterState::get,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
(reason, priority, listener) -> {
assertNotNull(listener);
assertThat(priority, equalTo(Priority.HIGH));
listener.onResponse(currentClusterState.get());
}) {
@Override
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
if (readOnly) {
assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
} else {
assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
}
listener.onResponse(null);
}
};
indicesToMarkReadOnly.set(null);
indicesToRelease.set(null);
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get());
assertNull(indicesToRelease.get());

// Reserved space is ignored when applying block
indicesToMarkReadOnly.set(null);
indicesToRelease.set(null);
builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90)));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90)));
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
assertNull(indicesToMarkReadOnly.get());
assertNull(indicesToRelease.get());

// Change cluster state so that "test_2" index is blocked (read only)
IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder()
.put(clusterState.metadata()
.index("test_2").getSettings())
.put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build();

final String sourceNode;
final String targetNode;
if (randomBoolean()) {
sourceNode = "node1";
targetNode = "my-node2";
} else {
sourceNode = "node2";
targetNode = "my-node1";
}

final ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata())
.put(indexMetadata, true)
.putCustom(NodesShutdownMetadata.TYPE,
new NodesShutdownMetadata(Collections.singletonMap(sourceNode,
SingleNodeShutdownMetadata.builder()
.setNodeId(sourceNode)
.setReason("testing")
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
.setTargetNodeName(targetNode)
.setStartedAtMillis(randomNonNegativeLong())
.build())))
.build())
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
.build();

assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));

currentClusterState.set(clusterStateWithBlocks);

// When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block
indicesToMarkReadOnly.set(null);
indicesToRelease.set(null);
builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100)));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
assertNull(indicesToRelease.get());

// While the REPLACE is ongoing the lock will not be removed from the index
indicesToMarkReadOnly.set(null);
indicesToRelease.set(null);
builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
assertNull(indicesToMarkReadOnly.get());
assertNull(indicesToRelease.get());

final ClusterState clusterStateNoShutdown = ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata())
.put(indexMetadata, true)
.removeCustom(NodesShutdownMetadata.TYPE)
.build())
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
.build();

assertTrue(clusterStateNoShutdown.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));

currentClusterState.set(clusterStateNoShutdown);

// Now that the REPLACE is gone, auto-releasing can occur for the index
indicesToMarkReadOnly.set(null);
indicesToRelease.set(null);
builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
assertNull(indicesToMarkReadOnly.get());
assertThat(indicesToRelease.get(), contains("test_2"));
}

@TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging")
public void testDiskMonitorLogging() throws IllegalAccessException {
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
Expand Down Expand Up @@ -657,14 +807,18 @@ private static DiscoveryNode newFrozenOnlyNode(String nodeId) {
Sets.union(org.elasticsearch.core.Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE), irrelevantRoles));
}

private static DiscoveryNode newNormalNode(String nodeId) {
private static DiscoveryNode newNormalNode(String nodeId, String nodeName) {
Set<DiscoveryNodeRole> randomRoles =
new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
Set<DiscoveryNodeRole> roles = Sets.union(randomRoles,
org.elasticsearch.core.Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, DiscoveryNodeRole.DATA_WARM_NODE_ROLE,
DiscoveryNodeRole.DATA_COLD_NODE_ROLE)));
return newNode(nodeId, roles);
return newNode(nodeName, nodeId, roles);
}

private static DiscoveryNode newNormalNode(String nodeId) {
return newNormalNode(nodeId, "");
}

// java 11 forward compatibility
Expand Down