Skip to content

Commit

Permalink
Add a setting to control auto release of OpenSearch Monitors managed …
Browse files Browse the repository at this point in the history
…index creation block (#6277) (#6284)

* Add a setting to control auto release of OpenSearch managed block


(cherry picked from commit ca9c1ad)

Signed-off-by: Rishav Sagar <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9553038 commit 8b6d95f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cancellation of in-flight SearchTasks based on resource consumption ([#5606](https://github.com/opensearch-project/OpenSearch/pull/5605))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- [Segment Replication] Add primary weight factor for balanced primary distribution ([#6017](https://github.com/opensearch-project/OpenSearch/pull/6017))
- Add a setting to control auto release of OpenSearch managed index creation block ([#6277](https://github.com/opensearch-project/OpenSearch/pull/6277))

### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.index.IndexRequestBuilder;
Expand All @@ -66,6 +65,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.fs.FsRepository;
Expand All @@ -90,13 +90,11 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -137,6 +135,7 @@ public void removeFilesystemProvider() {
}

private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(100, ByteSizeUnit.KB).getBytes();
private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name";

@Override
Expand Down Expand Up @@ -197,14 +196,13 @@ public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Excep
.build();

internalCluster().startClusterManagerOnlyNode(settings);
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
populateNode(dataNodeName);
}

getMockInternalClusterInfoService().refresh();
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
Expand All @@ -225,39 +223,111 @@ public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throw
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception {
public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithAutoReleaseEnabled() throws Exception {
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
final List<String> indexNames = new ArrayList<>();
internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
final String indexName = populateNode(dataNodeName);
indexNames.add(indexName);
}
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);

getMockInternalClusterInfoService().refresh();
// Validate if cluster block is applied on the cluster
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Delete indices to free space
deleteIndices(indexNames);
getMockInternalClusterInfoService().refresh();
// Free all the space
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);

// Validate if index create block is removed on the cluster
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithAutoReleaseDisabled() throws Exception {
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.put(DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), false)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Reduce disk space of all node until all of them is breaching high disk watermark
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);

// Validate if cluster block is applied on the cluster
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// Free all the space
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);

// Validate index create block is not removed on the cluster
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testDiskMonitorAppliesBlockBackWhenUserRemovesIndexCreateBlock() throws Exception {
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.put(DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), false)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);

// User applies index create block.
Settings createBlockSetting = Settings.builder().put(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey(), "true").build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(createBlockSetting).get());
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Reduce disk space of all node until all of them is breaching high disk watermark and DiskMonitor applies block.
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);

// Validate if cluster block is applied on the cluster
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);

// User removes the block.
Settings removeBlockSetting = Settings.builder().put(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey(), false).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(removeBlockSetting).get());

// Refresh so that DiskThresholdMonitor kicks in and applies block.
getMockInternalClusterInfoService().refresh();
// Validate index create block is not removed on the cluster
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
final Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
Expand All @@ -267,20 +337,22 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2, settings);
ensureStableCluster(3);

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// Create one of the index.
final String indexName = populateNode(dataNodeNames.get(0));
// Reduce disk space of all other node until all of them is breaching high disk watermark
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createAndPopulateIndex(indexName, dataNodeNames.get(0));
// Apply a read_only_allow_delete_block on one of the index
// (can happen if the corresponding node has breached flood stage watermark).
final Settings readOnlySettings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
.build();
client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get();
getMockInternalClusterInfoService().refresh();

// Reduce disk space of all node until all of them is breaching high disk watermark.
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);

// Validate index create block is applied on the cluster
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
Expand Down Expand Up @@ -365,18 +437,9 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

private void deleteIndices(final List<String> indexNames) throws ExecutionException, InterruptedException {
for (String indexName : indexNames) {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get());
assertFalse("index [" + indexName + "] should have been deleted", indexExists(indexName));
}
}

private String populateNode(final String dataNodeName) throws Exception {
final Path dataNodePath = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
long minShardSize = createAndPopulateIndex(indexName, dataNodeName);
fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
createAndPopulateIndex(indexName, dataNodeName);
return indexName;
}

Expand Down Expand Up @@ -466,6 +529,10 @@ private long createReasonableSizedShards(final String indexName) throws Interrup
}
}

private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
}

private void refreshDiskUsage() {
final ClusterInfoService clusterInfoService = internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
((InternalClusterInfoService) clusterInfoService).refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,12 @@ public void onNewInfo(ClusterInfo info) {
// If all the nodes are breaching high disk watermark, we apply index create block to avoid red clusters.
if (nodesOverHighThreshold.size() == nodes.size()) {
setIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())) {
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
}
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()) {
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
}
}

// exposed for tests to override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public class DiskThresholdSettings {
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Boolean> CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE = Setting.boolSetting(
"cluster.blocks.create_index.auto_release",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile String lowWatermarkRaw;
private volatile String highWatermarkRaw;
Expand All @@ -104,6 +110,7 @@ public class DiskThresholdSettings {
private volatile ByteSizeValue freeBytesThresholdLow;
private volatile ByteSizeValue freeBytesThresholdHigh;
private volatile boolean includeRelocations;
private volatile boolean createIndexBlockAutoReleaseEnabled;
private volatile boolean enabled;
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
Expand Down Expand Up @@ -134,12 +141,14 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings)
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
this.createIndexBlockAutoReleaseEnabled = CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE, this::setCreateIndexBlockAutoReleaseEnabled);
}

/**
Expand Down Expand Up @@ -333,6 +342,10 @@ private void setFloodStage(String floodStageRaw) {
);
}

private void setCreateIndexBlockAutoReleaseEnabled(boolean createIndexBlockAutoReleaseEnabled) {
this.createIndexBlockAutoReleaseEnabled = createIndexBlockAutoReleaseEnabled;
}

/**
* Gets the raw (uninterpreted) low watermark value as found in the settings.
*/
Expand Down Expand Up @@ -387,6 +400,10 @@ public TimeValue getRerouteInterval() {
return rerouteInterval;
}

public boolean isCreateIndexBlockAutoReleaseEnabled() {
return createIndexBlockAutoReleaseEnabled;
}

String describeLowThreshold() {
return freeBytesThresholdLow.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
Expand Down

0 comments on commit 8b6d95f

Please sign in to comment.