diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f9075b6a4ed3..2ac53eaa12ff5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Adding index create block when all nodes have breached high disk watermark ([#5852](https://github.com/opensearch-project/OpenSearch/pull/5852)) - Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 4664648c03ccc..561e4349a4890 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -39,11 +39,14 @@ import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; @@ -82,12 +85,15 @@ import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -126,7 +132,9 @@ public void removeFilesystemProvider() { defaultFileSystem = null; } - private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes(); + // Increasing watermark limit to avoid flaky test case failures. + private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes(); + private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name"; @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -167,16 +175,7 @@ public void testHighWatermarkNotExceeded() throws Exception { final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0]; final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6) - .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") - .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false) - .build() - ); - final long minShardSize = createReasonableSizedShards(indexName); + final long minShardSize = createAndPopulateIndex(indexName, null); // reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node // (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again) @@ -188,6 +187,124 @@ public void testHighWatermarkNotExceeded() throws Exception { assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1)); } + public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + ensureStableCluster(3); + + final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() + .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); + + // Reduce disk space of all node until all of them is breaching high disk watermark. + for (final String dataNodeName : dataNodeNames) { + populateNode(dataNodeName); + } + + // Wait for all nodes to breach high disk watermark. + assertBusy(() -> { + refreshDiskUsage(); + assertTrue( + StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false) + .allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES) + ); + }, 30L, TimeUnit.SECONDS); + + // Validate if cluster block is applied on the cluster + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); + } + + public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + ensureStableCluster(3); + + final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() + .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); + + // Validate cluster block is not applied on the cluster + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); + } + + public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + final List indexNames = new ArrayList<>(); + ensureStableCluster(3); + + final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() + .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); + + // Reduce disk space of all node until all of them is breaching high disk watermark. + for (final String dataNodeName : dataNodeNames) { + final String indexName = populateNode(dataNodeName); + indexNames.add(indexName); + } + + // Wait for all the node to breach high disk watermark. + assertBusy(() -> { + refreshDiskUsage(); + assertTrue( + StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false) + .allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES) + ); + }, 30L, TimeUnit.SECONDS); + + // Validate if index create block is applied on the cluster + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); + + // Delete indices to free space + deleteIndices(indexNames); + + // Validate if index create block is removed on the cluster + assertBusy(() -> { + refreshDiskUsage(); + ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); + }, 30L, TimeUnit.SECONDS); + } + + public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List dataNodeNames = internalCluster().startDataOnlyNodes(2); + ensureStableCluster(3); + final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() + .getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh()); + + // Create one of the index. + final String indexName = populateNode(dataNodeNames.get(0)); + + // Reduce disk space of all other node until all of them is breaching high disk watermark. + for (int i = 1; i < dataNodeNames.size(); i++) { + populateNode(dataNodeNames.get(i)); + } + + // Apply a read_only_allow_delete_block on one of the index + // (can happen if the corresponding node has breached flood stage watermark). + final Settings readOnlySettings = Settings.builder() + .put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()) + .build(); + client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get(); + + assertBusy(() -> { + refreshDiskUsage(); + assertTrue( + StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false) + .allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES) + ); + }, 30L, TimeUnit.SECONDS); + + // Validate index create block is applied on the cluster. + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); + } + public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception { internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); @@ -210,16 +327,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0]; final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6) - .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") - .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false) - .build() - ); - final long minShardSize = createReasonableSizedShards(indexName); + final long minShardSize = createAndPopulateIndex(indexName, null); final CreateSnapshotResponse createSnapshotResponse = client().admin() .cluster() @@ -274,6 +382,40 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1)); } + private void deleteIndices(final List indexNames) throws ExecutionException, InterruptedException { + for (String indexName : indexNames) { + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get()); + assertFalse("index [" + indexName + "] should have been deleted", indexExists(indexName)); + } + } + + private String populateNode(final String dataNodeName) throws Exception { + final Path dataNodePath = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0]; + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + long minShardSize = createAndPopulateIndex(indexName, dataNodeName); + fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L); + refreshDiskUsage(); + return indexName; + } + + private long createAndPopulateIndex(final String indexName, final String nodeName) throws Exception { + + final Settings.Builder indexSettingBuilder = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false); + + // Depending on node name specified or not, we determine whether to enable node name based shard routing for index. + if (nodeName != null) { + indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName); + } else { + indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6); + } + + createIndex(indexName, indexSettingBuilder.build()); + return createReasonableSizedShards(indexName); + } + private Set getShardRoutings(final String nodeId, final String indexName) { final Set shardRoutings = new HashSet<>(); for (IndexShardRoutingTable indexShardRoutingTable : client().admin() diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index 0a6cfd8c04977..4eb39caa2fdcf 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.DiskUsage; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; @@ -78,7 +79,6 @@ public class DiskThresholdMonitor { private static final Logger logger = LogManager.getLogger(DiskThresholdMonitor.class); - private final DiskThresholdSettings diskThresholdSettings; private final Client client; private final Supplier clusterStateSupplier; @@ -286,7 +286,7 @@ public void onNewInfo(ClusterInfo info) { } } - final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3); + final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4); if (reroute) { logger.debug("rerouting shards: [{}]", explanation); @@ -373,6 +373,15 @@ public void onNewInfo(ClusterInfo info) { } else { listener.onResponse(null); } + + // If all the nodes are breaching high disk watermark, we apply index create block to avoid red clusters. + if (nodesOverHighThreshold.size() == nodes.size()) { + setIndexCreateBlock(listener, true); + } else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())) { + setIndexCreateBlock(listener, false); + } else { + listener.onResponse(null); + } } // exposed for tests to override @@ -406,6 +415,27 @@ private void setLastRunTimeMillis() { lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); } + protected void setIndexCreateBlock(final ActionListener listener, boolean indexCreateBlock) { + final ActionListener wrappedListener = ActionListener.wrap(r -> { + setLastRunTimeMillis(); + listener.onResponse(r); + }, e -> { + logger.debug("setting index create block failed", e); + setLastRunTimeMillis(); + listener.onFailure(e); + }); + + final Settings indexCreateBlockSetting = indexCreateBlock + ? Settings.builder().put(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey(), Boolean.TRUE.toString()).build() + : Settings.builder().putNull(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey()).build(); + + client.admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(indexCreateBlockSetting) + .execute(ActionListener.map(wrappedListener, r -> null)); + } + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { // set read-only block but don't block on the response ActionListener wrappedListener = ActionListener.wrap(r -> { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 9f3f603e8edf3..4c4b362ef7c56 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -129,6 +129,11 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi assertTrue(readOnly); listener.onResponse(null); } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } }; ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); @@ -185,6 +190,11 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi assertTrue(readOnly); listener.onResponse(null); } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } }; indices.set(null); @@ -372,6 +382,12 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener } listener.onResponse(null); } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } + }; indicesToMarkReadOnly.set(null); indicesToRelease.set(null); @@ -428,6 +444,11 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener } listener.onResponse(null); } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } }; // When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block indicesToMarkReadOnly.set(null); @@ -536,6 +557,11 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) { return relocatingShardSizeRef.get(); } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } }; final ImmutableOpenMap.Builder allDisksOkBuilder;