Skip to content

Commit

Permalink
YARN-10965. Implement strict resource iteration order
Browse files Browse the repository at this point in the history
  • Loading branch information
9uapaw committed Nov 16, 2021
1 parent 83aabd0 commit 3dae1dc
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public float calculateMinimumResource(
QueueCapacityUpdateContext updateContext, CSQueue childQueue, String label,
QueueCapacityVectorEntry capacityVectorEntry) {
String resourceName = capacityVectorEntry.getResourceName();
ResourceVector ratio = updateContext.getQueueBranchContext(childQueue.getParent().getQueuePath())
ResourceVector ratio = updateContext.getOrCreateQueueBranchContext(childQueue.getParent().getQueuePath())
.getNormalizedResourceRatios().getOrDefault(label, ResourceVector.of(1));

return ratio.getValue(resourceName) * capacityVectorEntry.getResourceValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;

/**
* A strategy class to encapsulate queue capacity setup and resource calculation
Expand Down Expand Up @@ -94,7 +90,7 @@ public void calculateResourcePrerequisites(QueueCapacityUpdateContext updateCont
CSQueue parentQueue) {
for (String label : parentQueue.getConfiguredNodeLabels()) {
// We need to set normalized resource ratio only once per parent
if (updateContext.getQueueBranchContext(parentQueue.getQueuePath())
if (updateContext.getOrCreateQueueBranchContext(parentQueue.getQueuePath())
.getNormalizedResourceRatios().isEmpty()) {
setNormalizedResourceRatio(updateContext, parentQueue, label);
}
Expand Down Expand Up @@ -225,7 +221,7 @@ private void setNormalizedResourceRatio(
.getUnits(), childrenConfiguredResource);

if (convertedValue != 0) {
Map<String, ResourceVector> normalizedResourceRatios = updateContext.getQueueBranchContext(
Map<String, ResourceVector> normalizedResourceRatios = updateContext.getOrCreateQueueBranchContext(
parentQueue.getQueuePath()).getNormalizedResourceRatios();
normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance());
normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio /
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public float calculateMinimumResource(

float parentAbsoluteCapacity = parentQueue.getOrCreateAbsoluteMinCapacityVector(label).getValue(
resourceName);
float remainingPerEffectiveResourceRatio = updateContext.getQueueBranchContext(
parentQueue.getQueuePath()).getBatchRemainingResources(label)
float remainingPerEffectiveResourceRatio = updateContext.getOrCreateQueueBranchContext(
parentQueue.getQueuePath()).getPostCalculatorRemainingResource(label)
.getValue(resourceName) / parentQueue.getEffectiveCapacity(label)
.getResourceValue(resourceName);
float absoluteCapacity = parentAbsoluteCapacity *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class QueueBranchContext {
private final Map<String, ResourceVector> remainingResourceByLabel = new HashMap<>();
private final Map<String, ResourceVector> normalizedResourceRatio = new HashMap<>();
private final Map<String, ResourceVector> overallRemainingResource = new HashMap<>();
private final Map<String, Map<String, Float>> sumWeightsPerLabel = new HashMap<>();

/**
Expand All @@ -53,28 +54,52 @@ public float getSumWeightsByResource(String label, String resourceName) {
}

/**
* Sets the overall remaining resource under a parent that is available for its children to
* Sets the remaining resource under a parent that is available for its children to
* occupy.
*
* @param label node label
* @param resource resource vector
*/
public void setBatchRemainingResource(String label, ResourceVector resource) {
public void setPostCalculatorRemainingResource(String label, ResourceVector resource) {
remainingResourceByLabel.put(label, resource);
}

public Map<String, ResourceVector> getNormalizedResourceRatios() {
return normalizedResourceRatio;
}

/**
* Returns the remaining resources of a parent that is still available for its
* children.
*
* @param label node label
* @return remaining resources
*/
public ResourceVector getOverallRemainingResource(String label) {
overallRemainingResource.putIfAbsent(label, ResourceVector.newInstance());
return overallRemainingResource.get(label);
}

/**
* Sets the remaining resources of a parent that is still available for its children.
*
* @param label node label
* @param resourceVector resource vector
*/
public void setOverallRemainingResource(String label, ResourceVector resourceVector) {
overallRemainingResource.put(label, resourceVector);
}

/**
* Returns the remaining resources of a parent that is still available for its
* children. Decremented only after the calculator is finished its work on the corresponding
* resources.
*
* @param label node label
* @return remaining resources
*/
public ResourceVector getBatchRemainingResources(String label) {
return remainingResourceByLabel.getOrDefault(label, ResourceVector.newInstance());
public ResourceVector getPostCalculatorRemainingResource(String label) {
remainingResourceByLabel.putIfAbsent(label, ResourceVector.newInstance());
return remainingResourceByLabel.get(label);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@
*/
public class QueueCapacityUpdateContext {
private final Resource updatedClusterResource;

private final Map<String, QueueBranchContext> queueBranchContext
= LazyMap.decorate(new HashMap<String, QueueBranchContext>(),
QueueBranchContext::new);
private final RMNodeLabelsManager labelsManager;

private List<QueueUpdateWarning> warnings = new ArrayList<QueueUpdateWarning>();
private final Map<String, QueueBranchContext> queueBranchContext = new HashMap<>();
private final List<QueueUpdateWarning> warnings = new ArrayList<QueueUpdateWarning>();

public QueueCapacityUpdateContext(Resource updatedClusterResource,
RMNodeLabelsManager labelsManager) {
Expand Down Expand Up @@ -69,7 +66,8 @@ public Resource getUpdatedClusterResource() {
* @param queuePath queue path of the parent
* @return queue branch context
*/
public QueueBranchContext getQueueBranchContext(String queuePath) {
public QueueBranchContext getOrCreateQueueBranchContext(String queuePath) {
queueBranchContext.putIfAbsent(queuePath, new QueueBranchContext());
return queueBranchContext.get(queuePath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,21 @@ public void subtract(ResourceVector otherResourceVector) {
}

/**
* Increments the given resource by the specified value.
* Subtracts the given resource by the specified value.
* @param resourceName name of the resource
* @param value value to be added to the resource's current value
* @param value value to be subtracted from the resource's current value
*/
public void increment(String resourceName, float value) {
setValue(resourceName, getValue(resourceName) + value);
public void subtract(String resourceName, float value) {
setValue(resourceName, getValue(resourceName) - value);
}

/**
* Gets the average of all resource unit values.
* @return average of resource unit values
* Increments the given resource by the specified value.
* @param resourceName name of the resource
* @param value value to be added to the resource's current value
*/
public float getAverageValue() {
return (float) resourcesByName.values().stream().mapToDouble(value -> value).average()
.orElse(0);
}

public float getMaxValue() {
return resourcesByName.values().stream().max(Comparator.naturalOrder()).orElse(0f);
public void increment(String resourceName, float value) {
setValue(resourceName, getValue(resourceName) + value);
}

public float getValue(String resourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void calculateResourcePrerequisites(
for (String label : childQueue.getConfiguredNodeLabels()) {
for (String resourceName : childQueue.getConfiguredCapacityVector(label)
.getResourceNamesByCapacityType(getCapacityType())) {
updateContext.getQueueBranchContext(parentQueue.getQueuePath())
updateContext.getOrCreateQueueBranchContext(parentQueue.getQueuePath())
.incrementWeight(label, resourceName, childQueue.getConfiguredCapacityVector(label)
.getResource(resourceName).getResourceValue());
}
Expand All @@ -51,11 +51,11 @@ public float calculateMinimumResource(
CSQueue parentQueue = childQueue.getParent();
String resourceName = capacityVectorEntry.getResourceName();
float normalizedWeight = capacityVectorEntry.getResourceValue()
/ updateContext.getQueueBranchContext(parentQueue.getQueuePath())
/ updateContext.getOrCreateQueueBranchContext(parentQueue.getQueuePath())
.getSumWeightsByResource(label, resourceName);

float remainingResource = updateContext.getQueueBranchContext(
parentQueue.getQueuePath()).getBatchRemainingResources(label)
float remainingResource = updateContext.getOrCreateQueueBranchContext(
parentQueue.getQueuePath()).getPostCalculatorRemainingResource(label)
.getValue(resourceName);

// Due to rounding loss it is better to use all remaining resources if no other resource uses
Expand Down Expand Up @@ -101,7 +101,7 @@ public void updateCapacitiesAfterCalculation(

Collection<String> resourceNames = getResourceNames(queue, label);
for (String resourceName : resourceNames) {
float sumBranchWeight = updateContext.getQueueBranchContext(queue.getParent().getQueuePath())
float sumBranchWeight = updateContext.getOrCreateQueueBranchContext(queue.getParent().getQueuePath())
.getSumWeightsByResource(label, resourceName);
float capacity = queue.getConfiguredCapacityVector(label).getResource(
resourceName).getResourceValue() / sumBranchWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public class TestMixedQueueResourceCalculation extends CapacitySchedulerQueueCal
public static final Resource B1_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(8095, 3);
public static final Resource C_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(5803, 4);

public static final Resource B_WARNING_RESOURCE = Resource.newInstance(8096, 3);
public static final Resource B_WARNING_RESOURCE = Resource.newInstance(8096, 4);
public static final Resource B1_WARNING_RESOURCE = Resource.newInstance(8096, 3);
public static final Resource A_WARNING_RESOURCE = Resource.newInstance(8288, 9);
public static final Resource A_WARNING_RESOURCE = Resource.newInstance(8288, 12);
public static final Resource A1_WARNING_RESOURCE = Resource.newInstance(2048, 4);
public static final Resource A2_WARNING_RESOURCE = Resource.newInstance(2048, 5);
public static final Resource A2_WARNING_RESOURCE = Resource.newInstance(2048, 8);
public static final Resource A12_WARNING_RESOURCE = Resource.newInstance(2048, 4);

@Override
Expand Down Expand Up @@ -165,20 +165,18 @@ public void testComplexHierarchyWithWarnings() throws IOException {
updateContext.getUpdateWarnings(), QueueUpdateWarningType.BRANCH_DOWNSCALED, B);
Optional<QueueUpdateWarning> queueA11ZeroResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, A11);
Optional<QueueUpdateWarning> queueA12ZeroResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, A12);

Assert.assertTrue(queueCZeroResourceWarning.isPresent());
Assert.assertTrue(queueARemainingResourceWarning.isPresent());
Assert.assertTrue(queueBDownscalingWarning.isPresent());
Assert.assertTrue(queueA11ZeroResourceWarning.isPresent());
Assert.assertTrue(queueA12ZeroResourceWarning.isPresent());
}

@Test
public void testZeroResourceIfNoMemory() throws IOException {
csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(100), weight(6)));
csConf.setCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY), absolute(VCORES * 0.5)));
csConf.setCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY),
absolute(VCORES * 0.5)));

QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
Expand All @@ -200,10 +198,13 @@ public void testZeroResourceIfNoMemory() throws IOException {

@Test
public void testDifferentMinimumAndMaximumCapacityTypes() throws IOException {
csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(50), absolute(VCORES * 0.5)));
csConf.setMaximumCapacityVector(A, "", createMemoryVcoresVector(absolute(MEMORY), percentage(80)));
csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(50),
absolute(VCORES * 0.5)));
csConf.setMaximumCapacityVector(A, "", createMemoryVcoresVector(absolute(MEMORY),
percentage(80)));
csConf.setCapacityVector(B, "", createMemoryVcoresVector(weight(6), percentage(100)));
csConf.setMaximumCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY), absolute(VCORES * 0.5)));
csConf.setMaximumCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY),
absolute(VCORES * 0.5)));

QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
Expand All @@ -226,17 +227,20 @@ public void testDifferentMinimumAndMaximumCapacityTypes() throws IOException {
try {
cs.reinitialize(csConf, mockRM.getRMContext());
update(assertionBuilder, UPDATE_RESOURCE);
Assert.fail("WEIGHT maximum capacity type is not supported, an error should be thrown when set up");
Assert.fail("WEIGHT maximum capacity type is not supported, an error should be thrown when " +
"set up");
} catch (IllegalStateException ignored) {
}
}

@Test
public void testMaximumResourceWarnings() throws IOException {
csConf.setMaximumCapacityVector(A1, "", createMemoryVcoresVector(absolute(MEMORY * 0.5), percentage(100)));
csConf.setMaximumCapacityVector(A1, "", createMemoryVcoresVector(absolute(MEMORY * 0.5),
percentage(100)));
csConf.setCapacityVector(A11, "", createMemoryVcoresVector(percentage(50), percentage(100)));
csConf.setCapacityVector(A12, "", createMemoryVcoresVector(percentage(50), percentage(0)));
csConf.setMaximumCapacityVector(A11, "", createMemoryVcoresVector(absolute(MEMORY), percentage(10)));
csConf.setMaximumCapacityVector(A11, "", createMemoryVcoresVector(absolute(MEMORY),
percentage(10)));

QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A11)
Expand All @@ -253,7 +257,8 @@ public void testMaximumResourceWarnings() throws IOException {

QueueCapacityUpdateContext updateContext = update(assertionBuilder, UPDATE_RESOURCE);
Optional<QueueUpdateWarning> queueA11ExceedsParentMaxResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT, A11);
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT,
A11);
Optional<QueueUpdateWarning> queueA11MinExceedsMaxWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_EXCEEDS_MAX_RESOURCE, A11);
Assert.assertTrue(queueA11ExceedsParentMaxResourceWarning.isPresent());
Expand Down Expand Up @@ -288,7 +293,8 @@ private void setupQueueHierarchyWithWarnings() throws IOException {

csConf.setState(B, QueueState.RUNNING);
csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(100), weight(6)));
csConf.setCapacityVector(A1, "", createMemoryVcoresVector(absolute(2048), absolute(VCORES * 0.25)));
csConf.setCapacityVector(A1, "", createMemoryVcoresVector(absolute(2048),
absolute(VCORES * 0.25)));
csConf.setCapacityVector(A11, "", createMemoryVcoresVector(weight(1), absolute(VCORES * 0.25)));
csConf.setCapacityVector(A12, "", createMemoryVcoresVector(percentage(100), percentage(100)));
csConf.setCapacityVector(A2, "", createMemoryVcoresVector(absolute(2048), percentage(100)));
Expand All @@ -308,7 +314,7 @@ private void setQueues() {
private Optional<QueueUpdateWarning> getSpecificWarning(
Collection<QueueUpdateWarning> warnings, QueueUpdateWarningType warningTypeToSelect,
String queue) {
return warnings.stream().filter((w) -> w.getWarningType().equals(warningTypeToSelect) && w.getQueue().equals(
queue)).findFirst();
return warnings.stream().filter((w) -> w.getWarningType().equals(warningTypeToSelect)
&& w.getQueue().equals(queue)).findFirst();
}
}

0 comments on commit 3dae1dc

Please sign in to comment.