Skip to content

Commit

Permalink
Change DiskThresholdDecider's behavior when factoring in leaving shards
Browse files Browse the repository at this point in the history
This changes DiskThresholdDecider to only factor in leaving shards when
checking if a shard can remain. Previously, leaving shards were factored
in for both the `canAllocate` and `canRemain` checks, however, this
makes only the leaving shard sizes subtracted in the `canRemain` check.

It was possible that multiple shards relocating away from the node would
have their entire size subtracted, and the node had a chance to go over
the disk threshold (or hit the disk full) because it subtracted space
that was still being used for other in-progress relocations.
  • Loading branch information
dakrone authored and Isabel Drost-Fromm committed Sep 7, 2016
1 parent e59efaf commit 491edf7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings)
* Returns the size of all shards that are currently being relocated to
* the node, but may not be finished transferring yet.
*
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
*/
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
boolean subtractShardsMovingAway, String dataPath) {
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
Expand Down Expand Up @@ -111,7 +110,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();

DiskUsage usage = getDiskUsage(node, allocation, usages);
// subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful
// and take the size into account
DiskUsage usage = getDiskUsage(node, allocation, usages, false);
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
// Cache the used disk percentage for displaying disk percentages consistent with documentation
Expand Down Expand Up @@ -243,7 +244,9 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return decision;
}

final DiskUsage usage = getDiskUsage(node, allocation, usages);
// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
final DiskUsage usage = getDiskUsage(node, allocation, usages, true);
final String dataPath = clusterInfo.getDataPath(shardRouting);
// If this node is already above the high threshold, the shard cannot remain (get it off!)
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
Expand Down Expand Up @@ -280,7 +283,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
"there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
}

private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
ImmutableOpenMap<String, DiskUsage> usages, boolean subtractLeavingShards) {
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
Expand All @@ -293,7 +297,7 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, I
}

if (diskThresholdSettings.includeRelocations()) {
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath());
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
Expand Down Expand Up @@ -729,10 +730,9 @@ public void testShardRelocationsTakenIntoAccount() {
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);

DiskThresholdDecider decider = makeDecider(diskSettings);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
makeDecider(diskSettings))));
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), decider)));

ClusterInfoService cis = new ClusterInfoService() {
@Override
Expand Down Expand Up @@ -832,17 +832,20 @@ public void testCanRemainWithShardRelocatingAway() {
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 40L);
shardSizesBuilder.put("[test][1][p]", 40L);
shardSizesBuilder.put("[foo][0][p]", 10L);
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();

final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);

DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.put(IndexMetaData.builder("foo").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("foo"))
.build();

DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), emptyMap(),
Expand Down Expand Up @@ -881,6 +884,7 @@ public void testCanRemainWithShardRelocatingAway() {
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING);
ShardRouting fooRouting = TestShardRouting.newShardRouting("foo", 0, "node1", null, true, ShardRoutingState.UNASSIGNED);
firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting);
builder = RoutingTable.builder().add(
IndexRoutingTable.builder(firstRouting.index())
Expand All @@ -898,6 +902,8 @@ public void testCanRemainWithShardRelocatingAway() {
false);
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
decision = diskThresholdDecider.canAllocate(fooRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));

// Creating AllocationService instance and the services it depends on...
ClusterInfoService cis = new ClusterInfoService() {
Expand Down

0 comments on commit 491edf7

Please sign in to comment.