Skip to content

Commit

Permalink
Try to fix DiskThresholdDeciderIT (elastic#63614)
Browse files Browse the repository at this point in the history
This is another attempt to fix elastic#62326 as my previous 
attempts failed (elastic#63112, elastic#63385).
  • Loading branch information
tlrx committed Oct 14, 2020
1 parent e4b4351 commit d8fd8e6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;

Expand All @@ -72,6 +73,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -136,16 +138,15 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62326")
public void testHighWatermarkNotExceeded() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService
= (InternalClusterInfoService) internalCluster().getMasterNodeInstance(ClusterInfoService.class);
internalCluster().getMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
= (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
Expand All @@ -161,13 +162,11 @@ public void testHighWatermarkNotExceeded() throws Exception {
// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), empty()));
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, empty());

// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
refreshDiskUsage();
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), hasSize(1)));
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
Expand All @@ -183,8 +182,8 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
.put("compress", randomBoolean())));

final InternalClusterInfoService clusterInfoService
= (InternalClusterInfoService) internalCluster().getMasterNodeInstance(ClusterInfoService.class);
internalCluster().getMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
= (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
Expand Down Expand Up @@ -231,8 +230,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti

// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
refreshDiskUsage();
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), hasSize(1)));
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

private Set<ShardRouting> getShardRoutings(final String nodeId, final String indexName) {
Expand Down Expand Up @@ -277,14 +275,7 @@ private long createReasonableSizedShards(final String indexName) throws Interrup
}

private void refreshDiskUsage() {
assertFalse(client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get()
.isTimedOut());

final ClusterInfoService clusterInfoService = internalCluster().getMasterNodeInstance(ClusterInfoService.class);
final ClusterInfoService clusterInfoService = internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
((InternalClusterInfoService) clusterInfoService).refresh();
// if the nodes were all under the low watermark already (but unbalanced) then a change in the disk usage doesn't trigger a reroute
// even though it's now possible to achieve better balance, so we have to do an explicit reroute. TODO fix this?
Expand All @@ -301,6 +292,22 @@ private void refreshDiskUsage() {
.isTimedOut());
}

private void assertBusyWithDiskUsageRefresh(
String nodeName,
String indexName,
Matcher<? super Set<ShardRouting>> matcher
) throws Exception {
assertBusy(() -> {
// refresh the master's ClusterInfoService before checking the assigned shards because DiskThresholdMonitor might still
// be processing a previous ClusterInfo update and will skip the new one (see DiskThresholdMonitor#onNewInfo(ClusterInfo)
// and its internal checkInProgress flag)
refreshDiskUsage();

final Set<ShardRouting> shardRoutings = getShardRoutings(nodeName, indexName);
assertThat("Mismatching shard routings: " + shardRoutings, shardRoutings, matcher);
}, 30L, TimeUnit.SECONDS);
}

private static class TestFileStore extends FilterFileStore {

private final Path path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private void checkFinished() {
}

public void onNewInfo(ClusterInfo info) {

// TODO find a better way to limit concurrent updates (and potential associated reroutes) while allowing tests to ensure that
// all ClusterInfo updates are processed and never ignored
if (checkInProgress.compareAndSet(false, true) == false) {
logger.info("skipping monitor as a check is already in progress");
return;
Expand Down

0 comments on commit d8fd8e6

Please sign in to comment.