Skip to content

Commit

Permalink
[7.13] [ML] prevent accidentally asking for more resources when scali…
Browse files Browse the repository at this point in the history
…ng down and improve scaling size estimations (#74691) (#74782)

* [ML] prevent accidentally asking for more resources when scaling down and improve scaling size estimations (#74691)

This commit addresses two problems:

 - Our memory estimations are not very exact. Consequently, its possible to request for too much or too little by a handful of KBs, while this is not a large issue in ESS, for custom tier sizes, it may be.
 - When scaling down, it was possible that part of the scale down was actually a scale up! This was due to some floating point rounding errors and poor estimations. Even though are estimations are better, it is best to NOT request higher resources in a scale down, no matter what.

One of the ways we improve the calculation is during JVM size calculations. Instead of having the knot point be `2gb` it has been changed to `1.2gb`. This accounts for the "window of uncertainty" for JVM sizes.

closes: #74709
  • Loading branch information
benwtrent authored Jul 1, 2021
1 parent 42fb9ec commit e31d5f8
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;

public class AutoscalingIT extends MlNativeAutodetectIntegTestCase {
Expand All @@ -46,25 +46,38 @@ public class AutoscalingIT extends MlNativeAutodetectIntegTestCase {

// This test assumes that xpack.ml.max_machine_memory_percent is 30
// and that xpack.ml.use_auto_machine_memory_percent is false
public void testMLAutoscalingCapacity() {
public void testMLAutoscalingCapacity() throws Exception {
SortedMap<String, Settings> deciders = new TreeMap<>();
deciders.put(MlAutoscalingDeciderService.NAME,
Settings.builder().put(MlAutoscalingDeciderService.DOWN_SCALE_DELAY.getKey(), TimeValue.ZERO).build());
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
"ml_test",
new TreeSet<>(org.elasticsearch.common.collect.Set.of("ml")),
new TreeSet<>(Arrays.asList(
"transform",
"data_frozen",
"master",
"remote_cluster_client",
"data",
"ml",
"data_content",
"data_hot",
"data_warm",
"data_cold",
"ingest"
)),
deciders
);
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());

assertMlCapacity(
assertBusy(() -> assertMlCapacity(
client().execute(
GetAutoscalingCapacityAction.INSTANCE,
new GetAutoscalingCapacityAction.Request()
).actionGet(),
"Requesting scale down as tier and/or node size could be smaller",
0L,
0L);
0L)
);

putJob("job1", 100);
putJob("job2", 200);
Expand Down Expand Up @@ -151,8 +164,8 @@ private void assertMlCapacity(GetAutoscalingCapacityAction.Response capacity, St

AutoscalingDeciderResult autoscalingDeciderResult = autoscalingDeciderResults.results().get("ml");
assertThat(autoscalingDeciderResult.reason().summary(), containsString(reason));
assertThat(autoscalingDeciderResult.requiredCapacity().total().memory().getBytes(), equalTo(tierBytes));
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(nodeBytes));
assertThat(autoscalingDeciderResult.requiredCapacity().total().memory().getBytes(), greaterThanOrEqualTo(tierBytes - 1L));
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), greaterThanOrEqualTo(nodeBytes - 1L));
}

private void putJob(String jobId, long limitMb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
private static final Duration DEFAULT_MEMORY_REFRESH_RATE = Duration.ofMinutes(15);
private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale";
private static final long NO_SCALE_DOWN_POSSIBLE = -1L;
// If ensureScaleDown changes the calculation by more than this much, log the error
private static final long ACCEPTABLE_DIFFERENCE = ByteSizeValue.ofMb(1).getBytes();

public static final String NAME = "ml";
public static final Setting<Integer> NUM_ANOMALY_JOBS_IN_QUEUE = Setting.intSetting("num_anomaly_jobs_in_queue", 0, 0);
Expand Down Expand Up @@ -359,6 +361,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider

final List<DiscoveryNode> nodes = getNodes(clusterState);
final NativeMemoryCapacity currentScale = currentScale(nodes);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder()
.setWaitingAnomalyJobs(waitingAnomalyJobs)
.setWaitingAnalyticsJobs(waitingAnalyticsJobs)
Expand Down Expand Up @@ -497,9 +500,21 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.build()));
}

final Optional<AutoscalingDeciderResult> scaleDownDecision = checkForScaleDown(nodeLoads, largestJob, currentScale, reasonBuilder);
final Optional<AutoscalingDeciderResult> maybeScaleDown = checkForScaleDown(nodeLoads, largestJob, currentScale, reasonBuilder)
// Due to weird rounding errors, it may be that a scale down result COULD cause a scale up
// Ensuring the scaleDown here forces the scale down result to always be lower than the current capacity.
// This is safe as we know that ALL jobs are assigned at the current capacity
.map(result -> {
AutoscalingCapacity capacity = ensureScaleDown(result.requiredCapacity(), context.currentCapacity());
if (capacity == null) {
return null;
}
return new AutoscalingDeciderResult(capacity, result.reason());
});

if (maybeScaleDown.isPresent()) {
final AutoscalingDeciderResult scaleDownDecisionResult = maybeScaleDown.get();

if (scaleDownDecision.isPresent()) {
// Given maxOpenJobs, could we scale down to just one node?
// We have no way of saying "we need X nodes"
if (nodeLoads.size() > 1) {
Expand All @@ -516,7 +531,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
MAX_OPEN_JOBS_PER_NODE.getKey());
logger.info(() -> new ParameterizedMessage("{} Calculated potential scaled down capacity [{}] ",
msg,
scaleDownDecision.get().requiredCapacity()));
scaleDownDecisionResult.requiredCapacity()));
return new AutoscalingDeciderResult(context.currentCapacity(), reasonBuilder.setSimpleReason(msg).build());
}
}
Expand All @@ -528,14 +543,14 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
TimeValue downScaleDelay = DOWN_SCALE_DELAY.get(configuration);
long msLeftToScale = downScaleDelay.millis() - (now - scaleDownDetected);
if (msLeftToScale <= 0) {
return scaleDownDecision.get();
return scaleDownDecisionResult;
}
logger.debug(() -> new ParameterizedMessage(
"not scaling down as the current scale down delay [{}] is not satisfied." +
" The last time scale down was detected [{}]. Calculated scaled down capacity [{}] ",
downScaleDelay.getStringRep(),
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(scaleDownDetected),
scaleDownDecision.get().requiredCapacity()));
scaleDownDecisionResult.requiredCapacity()));
return new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder
Expand All @@ -560,6 +575,31 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.build()));
}

static AutoscalingCapacity ensureScaleDown(AutoscalingCapacity scaleDownResult, AutoscalingCapacity currentCapacity) {
if (currentCapacity == null || scaleDownResult == null) {
return null;
}
AutoscalingCapacity newCapacity = new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(
currentCapacity.total().storage(),
ByteSizeValue.ofBytes(Math.min(scaleDownResult.total().memory().getBytes(), currentCapacity.total().memory().getBytes()))
),
new AutoscalingCapacity.AutoscalingResources(
currentCapacity.node().storage(),
ByteSizeValue.ofBytes(Math.min(scaleDownResult.node().memory().getBytes(), currentCapacity.node().memory().getBytes()))
)
);
if (scaleDownResult.node().memory().getBytes() - newCapacity.node().memory().getBytes() > ACCEPTABLE_DIFFERENCE
|| scaleDownResult.total().memory().getBytes() - newCapacity.total().memory().getBytes() > ACCEPTABLE_DIFFERENCE) {
logger.warn(
"scale down accidentally requested a scale up, auto-corrected; initial scaling [{}], corrected [{}]",
scaleDownResult,
newCapacity
);
}
return newCapacity;
}

AutoscalingDeciderResult noScaleResultOrRefresh(MlScalingReason.Builder reasonBuilder,
boolean memoryTrackingStale,
AutoscalingDeciderResult potentialResult) {
Expand Down Expand Up @@ -816,8 +856,11 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<NodeLoad> nodeLoads,
// Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible).
if (currentlyNecessaryTier < currentCapacity.getTier() || currentlyNecessaryNode < currentCapacity.getNode()) {
NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity(
currentlyNecessaryTier,
currentlyNecessaryNode,
// Since we are in the `scaleDown` branch, we know jobs are running and we could be smaller
// If we have some weird rounding errors, it may be that the `currentlyNecessary` values are larger than
// current capacity. We never want to accidentally say "scale up" via a scale down.
Math.min(currentlyNecessaryTier, currentCapacity.getTier()),
Math.min(currentlyNecessaryNode, currentCapacity.getNode()),
// If our newly suggested native capacity is the same, we can use the previously stored jvm size
currentlyNecessaryNode == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
AutoscalingCapacity requiredCapacity = nativeMemoryCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

import java.util.Objects;
import java.util.Optional;

import static org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator.dynamicallyCalculateJvmSizeFromNativeMemorySize;

// Used for storing native memory capacity and then transforming it into an autoscaling capacity
// which takes into account the whole node size
Expand Down Expand Up @@ -49,22 +52,26 @@ NativeMemoryCapacity merge(NativeMemoryCapacity nativeMemoryCapacity) {
return this;
}

AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) {
public AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) {
// We calculate the JVM size here first to ensure it stays the same given the rest of the calculations
final Long jvmSize = useAuto ?
Optional.ofNullable(this.jvmSize).orElse(dynamicallyCalculateJvmSizeFromNativeMemorySize(node)) :
null;
// We first need to calculate the actual node size given the current native memory size.
// This way we can accurately determine the required node size AND what the overall memory percentage will be
long actualNodeSize = NativeMemoryCalculator.calculateApproxNecessaryNodeSize(node, jvmSize, maxMemoryPercent, useAuto);
// We make the assumption that the JVM size is the same across the entire tier
// This simplifies calculating the tier as it means that each node in the tier
// will have the same dynamic memory calculation. And thus the tier is simply the sum of the memory necessary
// times that scaling factor.
int memoryPercentForMl = (int)Math.floor(NativeMemoryCalculator.modelMemoryPercent(
double memoryPercentForMl = NativeMemoryCalculator.modelMemoryPercent(
actualNodeSize,
jvmSize,
maxMemoryPercent,
useAuto
));
);
double inverseScale = memoryPercentForMl <= 0 ? 0 : 100.0 / memoryPercentForMl;
long actualTier = (long)Math.ceil(tier * inverseScale);
long actualTier = Math.round(tier * inverseScale);
return new AutoscalingCapacity(
// Tier should always be AT LEAST the largest node size.
// This Math.max catches any strange rounding errors or weird input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

public final class NativeMemoryCalculator {

private static final long STATIC_JVM_UPPER_THRESHOLD = ByteSizeValue.ofGb(2).getBytes();
static final long MINIMUM_AUTOMATIC_NODE_SIZE = ByteSizeValue.ofGb(1).getBytes();
private static final long OS_OVERHEAD = ByteSizeValue.ofMb(200L).getBytes();

Expand Down Expand Up @@ -80,15 +81,11 @@ public static long calculateApproxNecessaryNodeSize(long nativeMachineMemory, Lo
if (useAuto) {
// TODO utilize official ergonomic JVM size calculations when available.
jvmSize = jvmSize == null ? dynamicallyCalculateJvmSizeFromNativeMemorySize(nativeMachineMemory) : jvmSize;
// We use a Math.floor here to ensure we have AT LEAST enough memory given rounding.
int modelMemoryPercent = (int)Math.floor(modelMemoryPercent(
nativeMachineMemory + jvmSize + OS_OVERHEAD,
jvmSize,
maxMemoryPercent,
true));
// We calculate the inverse percentage of `nativeMachineMemory + OS_OVERHEAD` as `OS_OVERHEAD` is always present
// on the native memory side and we need to account for it when we invert the model memory percentage
return Math.max((long)Math.ceil((100.0/modelMemoryPercent) * (nativeMachineMemory + OS_OVERHEAD)), MINIMUM_AUTOMATIC_NODE_SIZE);
// We haven't reached our 90% threshold, so, simply summing up the values is adequate
if ((jvmSize + OS_OVERHEAD)/(double)nativeMachineMemory > 0.1) {
return Math.max(nativeMachineMemory + jvmSize + OS_OVERHEAD, MINIMUM_AUTOMATIC_NODE_SIZE);
}
return Math.round((nativeMachineMemory/0.9));
}
return (long) ((100.0/maxMemoryPercent) * nativeMachineMemory);
}
Expand Down Expand Up @@ -118,18 +115,11 @@ public static double modelMemoryPercent(long machineMemory, Long jvmSize, int ma
return maxMemoryPercent;
}

public static int modelMemoryPercent(long machineMemory, int maxMemoryPercent, boolean useAuto) {
return (int)Math.ceil(modelMemoryPercent(machineMemory,
null,
maxMemoryPercent,
useAuto));
}

private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxMemoryPercent, boolean useAuto) {
static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxMemoryPercent, boolean useAuto) {
if (useAuto && jvmSize != null) {
// It is conceivable that there is a machine smaller than 200MB.
// If the administrator wants to use the auto configuration, the node should be larger.
if (machineMemory - jvmSize < OS_OVERHEAD || machineMemory == 0) {
if (machineMemory - jvmSize <= OS_OVERHEAD || machineMemory == 0) {
return machineMemory / 100;
}
// This calculation is dynamic and designed to maximally take advantage of the underlying machine for machine learning
Expand All @@ -139,8 +129,8 @@ private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxM
// 2GB node -> 66%
// 16GB node -> 87%
// 64GB node -> 90%
long memoryPercent = Math.min(90, (int)Math.ceil(((machineMemory - jvmSize - OS_OVERHEAD) / (double)machineMemory) * 100.0D));
return (long)(machineMemory * (memoryPercent / 100.0));
double memoryProportion = Math.min(0.90, (machineMemory - jvmSize - OS_OVERHEAD) / (double)machineMemory);
return Math.round(machineMemory * memoryProportion);
}

return (long)(machineMemory * (maxMemoryPercent / 100.0));
Expand All @@ -154,30 +144,33 @@ public static long allowedBytesForMl(long machineMemory, int maxMemoryPercent, b
}

// TODO replace with official ergonomic calculation
private static long dynamicallyCalculateJvmSizeFromNodeSize(long nodeSize) {
if (nodeSize < ByteSizeValue.ofGb(2).getBytes()) {
return (long) (nodeSize * 0.40);
public static long dynamicallyCalculateJvmSizeFromNodeSize(long nodeSize) {
// While the original idea here was to predicate on 2Gb, it has been found that the knot points of
// 2GB and 8GB cause weird issues where the JVM size will "jump the gap" from one to the other when
// considering true tier sizes in elastic cloud.
if (nodeSize < ByteSizeValue.ofMb(1280).getBytes()) {
return (long)(nodeSize * 0.40);
}
if (nodeSize < ByteSizeValue.ofGb(8).getBytes()) {
return (long) (nodeSize * 0.25);
return (long)(nodeSize * 0.25);
}
return ByteSizeValue.ofGb(2).getBytes();
return STATIC_JVM_UPPER_THRESHOLD;
}

private static long dynamicallyCalculateJvmSizeFromNativeMemorySize(long nativeMachineMemory) {
public static long dynamicallyCalculateJvmSizeFromNativeMemorySize(long nativeMachineMemory) {
// See dynamicallyCalculateJvm the following JVM calculations are arithmetic inverses of JVM calculation
//
// Example: For < 2GB node, the JVM is 0.4 * total_node_size. This means, the rest is 0.6 the node size.
// So, the `nativeAndOverhead` is == 0.6 * total_node_size => total_node_size = (nativeAndOverHead / 0.6)
// Consequently jvmSize = (nativeAndOverHead / 0.6)*0.4 = nativeAndOverHead * 2/3
long nativeAndOverhead = nativeMachineMemory + OS_OVERHEAD;
if (nativeAndOverhead < (ByteSizeValue.ofGb(2).getBytes() * 0.60)) {
return (long) Math.ceil(nativeAndOverhead * (2.0 / 3.0));
return Math.round((nativeAndOverhead / 0.6) * 0.4);
}
if (nativeAndOverhead < (ByteSizeValue.ofGb(8).getBytes() * 0.75)) {
return (long) Math.ceil(nativeAndOverhead / 3.0);
return Math.round((nativeAndOverhead / 0.75) * 0.25);
}
return ByteSizeValue.ofGb(2).getBytes();
return STATIC_JVM_UPPER_THRESHOLD;
}

}
Loading

0 comments on commit e31d5f8

Please sign in to comment.