Skip to content

Commit

Permalink
Quieter logging from the DiskThresholdMonitor (#48115)
Browse files Browse the repository at this point in the history
Today if an Elasticsearch node reaches a disk watermark then it will repeatedly
emit logging about it, which implies that some action needs to be taken by the
administrator. This is misleading. Elasticsearch strives to keep nodes under
the high watermark, but it is normal to have a few nodes occasionally exceed
this level. Nodes may be over the low watermark for an extended period without
any ill effects.

This commit enhances the logging emitted by the `DiskThresholdMonitor` to be
less misleading. The expected case of hitting the high watermark and
immediately relocating one or more shards that to bring the node back under the
watermark again is reduced in severity to `INFO`. Additionally, `INFO` messages
are not emitted repeatedly.

Fixes #48038
  • Loading branch information
DaveCTurner authored Oct 18, 2019
1 parent cf386fb commit e16bb9a
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class BatchedRerouteService implements RerouteService {

private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending
private List<ActionListener<Void>> pendingRerouteListeners;
private List<ActionListener<ClusterState>> pendingRerouteListeners;
private Priority pendingTaskPriority = Priority.LANGUID;

/**
Expand All @@ -65,8 +65,8 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterSt
* Initiates a reroute.
*/
@Override
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
final List<ActionListener<Void>> currentListeners;
public final void reroute(String reason, Priority priority, ActionListener<ClusterState> listener) {
final List<ActionListener<ClusterState>> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {
if (priority.sameOrAfter(pendingTaskPriority)) {
Expand Down Expand Up @@ -148,7 +148,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
ActionListener.onResponse(currentListeners, null);
ActionListener.onResponse(currentListeners, newState);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Priority;

/**
Expand All @@ -33,5 +34,5 @@ public interface RerouteService {
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
* the priority of the pending batch is raised to the given priority.
*/
void reroute(String reason, Priority priority, ActionListener<Void> listener);
void reroute(String reason, Priority priority, ActionListener<ClusterState> listener);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -298,6 +299,24 @@ public TimeValue getRerouteInterval() {
return rerouteInterval;
}

String describeLowThreshold() {
return freeBytesThresholdLow.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%")
: freeBytesThresholdLow.toString();
}

String describeHighThreshold() {
return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdHigh, "%")
: freeBytesThresholdHigh.toString();
}

String describeFloodStageThreshold() {
return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdFloodStage, "%")
: freeBytesThresholdFloodStage.toString();
}

/**
* Attempts to parse the watermark into a percentage, returning 100.0% if
* it cannot be parsed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,8 +817,9 @@ private void allocateUnassigned() {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}

final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
if (!shard.primary()) {
Expand All @@ -838,8 +839,9 @@ private void allocateUnassigned() {
if (minNode != null) {
// throttle decision scenario
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
final RoutingNode node = minNode.getRoutingNode();
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
Expand Down Expand Up @@ -86,10 +88,9 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings)
*
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
*/
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo,
MetaData metaData, RoutingTable routingTable) {
long totalSize = 0L;

for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
Expand All @@ -103,7 +104,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space
if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0);
totalSize += getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
}
}

Expand All @@ -115,7 +116,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
}
if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0);
totalSize -= getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
}
}
}
Expand Down Expand Up @@ -239,7 +240,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
final long shardSize = getExpectedShardSize(shardRouting, allocation, 0);
final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
Expand Down Expand Up @@ -339,7 +341,8 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}

final long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(),
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
final DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
logger.trace("getDiskUsage: usage [{}] with [{}] bytes relocating yields [{}]",
Expand Down Expand Up @@ -418,29 +421,28 @@ private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<S
* Returns the expected shard size for the given shard or the default value provided if not enough information are available
* to estimate the shards size.
*/
public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) {
final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index());
final ClusterInfo info = allocation.clusterInfo();
if (metaData.getResizeSourceIndex() != null && shard.active() == false &&
public static long getExpectedShardSize(ShardRouting shard, long defaultValue, ClusterInfo clusterInfo, MetaData metaData,
RoutingTable routingTable) {
final IndexMetaData indexMetaData = metaData.getIndexSafe(shard.index());
if (indexMetaData.getResizeSourceIndex() != null && shard.active() == false &&
shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
// the worst case
long targetShardSize = 0;
final Index mergeSourceIndex = metaData.getResizeSourceIndex();
final IndexMetaData sourceIndexMeta = allocation.metaData().index(mergeSourceIndex);
final Index mergeSourceIndex = indexMetaData.getResizeSourceIndex();
final IndexMetaData sourceIndexMeta = metaData.index(mergeSourceIndex);
if (sourceIndexMeta != null) {
final Set<ShardId> shardIds = IndexMetaData.selectRecoverFromShards(shard.id(),
sourceIndexMeta, metaData.getNumberOfShards());
for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) {
sourceIndexMeta, indexMetaData.getNumberOfShards());
for (IndexShardRoutingTable shardRoutingTable : routingTable.index(mergeSourceIndex.getName())) {
if (shardIds.contains(shardRoutingTable.shardId())) {
targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0);
targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0);
}
}
}
return targetShardSize == 0 ? defaultValue : targetShardSize;
} else {
return info.getShardSize(shard, defaultValue);
return clusterInfo.getShardSize(shard, defaultValue);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC

private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;

private static void neverReroutes(String reason, Priority priority, ActionListener<Void> listener) {
@SuppressWarnings("unused")
private static void neverReroutes(String reason, Priority priority, ActionListener<ClusterState> listener) {
fail("unexpectedly ran a deferred reroute");
}

Expand Down
Loading

0 comments on commit e16bb9a

Please sign in to comment.