diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbsoluteResourceCapacityCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbsoluteResourceCapacityCalculator.java index 25357e06095a2..3560c9a12589f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbsoluteResourceCapacityCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbsoluteResourceCapacityCalculator.java @@ -28,7 +28,7 @@ public float calculateMinimumResource( QueueCapacityUpdateContext updateContext, CSQueue childQueue, String label, QueueCapacityVectorEntry capacityVectorEntry) { String resourceName = capacityVectorEntry.getResourceName(); - ResourceVector ratio = updateContext.getQueueBranchContext(childQueue.getParent().getQueuePath()) + ResourceVector ratio = updateContext.getOrCreateQueueBranchContext(childQueue.getParent().getQueuePath()) .getNormalizedResourceRatios().getOrDefault(label, ResourceVector.of(1)); return ratio.getValue(resourceName) * capacityVectorEntry.getResourceValue(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractQueueCapacityCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractQueueCapacityCalculator.java index 61b57a7860dac..87c514b2621d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractQueueCapacityCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractQueueCapacityCalculator.java @@ -25,14 +25,10 @@ import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; import java.util.Map; import java.util.Set; import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; -import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; /** * A strategy class to encapsulate queue capacity setup and resource calculation @@ -94,7 +90,7 @@ public void calculateResourcePrerequisites(QueueCapacityUpdateContext updateCont CSQueue parentQueue) { for (String label : parentQueue.getConfiguredNodeLabels()) { // We need to set normalized resource ratio only once per parent - if (updateContext.getQueueBranchContext(parentQueue.getQueuePath()) + if (updateContext.getOrCreateQueueBranchContext(parentQueue.getQueuePath()) .getNormalizedResourceRatios().isEmpty()) { setNormalizedResourceRatio(updateContext, parentQueue, label); } @@ -225,7 +221,7 @@ private void setNormalizedResourceRatio( .getUnits(), childrenConfiguredResource); if (convertedValue != 0) { - Map normalizedResourceRatios = updateContext.getQueueBranchContext( + Map normalizedResourceRatios = updateContext.getOrCreateQueueBranchContext( parentQueue.getQueuePath()).getNormalizedResourceRatios(); normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance()); normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio / diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java index f1a3fa8cebfb0..cceff569d5278 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueCapacityHandler.java @@ -28,14 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT; @@ -53,14 +58,13 @@ public class CapacitySchedulerQueueCapacityHandler { ResourceUnitCapacityType.ABSOLUTE, ResourceUnitCapacityType.PERCENTAGE, ResourceUnitCapacityType.WEIGHT); - public static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); - private final Map calculators; private final AbstractQueueCapacityCalculator rootCalculator = new RootQueueCapacityCalculator(); private final RMNodeLabelsManager labelsManager; - private QueueResourceRoundingStrategy roundingStrategy = + private final Collection definedResources = new LinkedHashSet<>(); + private final QueueResourceRoundingStrategy roundingStrategy = new DefaultQueueResourceRoundingStrategy(); public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager labelsManager) { @@ -73,6 +77,8 @@ public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager labelsManager) new PercentageQueueCapacityCalculator()); this.calculators.put(ResourceUnitCapacityType.WEIGHT, new WeightQueueCapacityCalculator()); + + loadResourceNames(); } /** @@ -124,7 +130,6 @@ private void updateRoot( queue.refreshAfterResourceCalculation(newContext.getUpdatedClusterResource(), resourceLimits); } - private void updateChildren( CSQueue parent, QueueCapacityUpdateContext updateContext, ResourceLimits resourceLimits) { @@ -133,152 +138,134 @@ private void updateChildren( } for (String label : parent.getConfiguredNodeLabels()) { - updateContext.getQueueBranchContext(parent.getQueuePath()).setBatchRemainingResource(label, + updateContext.getOrCreateQueueBranchContext(parent.getQueuePath()).setPostCalculatorRemainingResource(label, + ResourceVector.of(parent.getEffectiveCapacity(label))); + updateContext.getOrCreateQueueBranchContext(parent.getQueuePath()).setOverallRemainingResource(label, ResourceVector.of(parent.getEffectiveCapacity(label))); } + for (AbstractQueueCapacityCalculator capacityCalculator : calculators.values()) { + capacityCalculator.calculateResourcePrerequisites(updateContext, parent); + } + calculateChildrenResources(updateContext, parent); for (String label : parent.getConfiguredNodeLabels()) { - if (!updateContext.getQueueBranchContext(parent.getQueuePath()).getBatchRemainingResources( + if (!updateContext.getOrCreateQueueBranchContext(parent.getQueuePath()).getPostCalculatorRemainingResource( label).equals(ResourceVector.newInstance())) { updateContext.addUpdateWarning(QueueUpdateWarningType.BRANCH_UNDERUTILIZED.ofQueue( parent.getQueuePath())); } } - for (CSQueue childQueue : parent.getChildQueues()) { - for (String label : childQueue.getConfiguredNodeLabels()) { - updateChildCapacities(updateContext, childQueue, label); - } - - ResourceLimits childLimit = ((ParentQueue) parent).getResourceLimitsOfChild( - childQueue, updateContext.getUpdatedClusterResource(), resourceLimits, NO_LABEL, false); - childQueue.refreshAfterResourceCalculation(updateContext.getUpdatedClusterResource(), - childLimit); - updateChildren(childQueue, updateContext, childLimit); - } - } - - private void updateChildCapacities( - QueueCapacityUpdateContext updateContext, CSQueue childQueue, String label) { - QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label); - if (capacityVector.isMixedCapacityVector()) { - // Post update capacities based on the calculated effective resource values - AbstractQueueCapacityCalculator.setQueueCapacities(updateContext.getUpdatedClusterResource( - label), childQueue, label); - } else { - // Update capacities according to the legacy logic - for (ResourceUnitCapacityType capacityType : - childQueue.getConfiguredCapacityVector(label).getDefinedCapacityTypes()) { - AbstractQueueCapacityCalculator calculator = calculators.get(capacityType); - calculator.updateCapacitiesAfterCalculation(updateContext, childQueue, label); - } - } - - // If memory is zero, all other resource units should be considered zero as well. - if (childQueue.getEffectiveCapacity(label).getMemorySize() == 0) { - childQueue.getQueueResourceQuotas().setEffectiveMinResource(label, ZERO_RESOURCE); - } - - if (childQueue.getEffectiveMaxCapacity(label).getMemorySize() == 0) { - childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label, ZERO_RESOURCE); - } + updateChildrenAfterCalculation(parent, updateContext, resourceLimits); } private void calculateChildrenResources( QueueCapacityUpdateContext updateContext, CSQueue parent) { - for (ResourceUnitCapacityType capacityType : CALCULATOR_PRECEDENCE) { - Map aggregatedResources = new HashMap<>(); - AbstractQueueCapacityCalculator capacityCalculator = calculators.get(capacityType); - capacityCalculator.calculateResourcePrerequisites(updateContext, parent); - - for (CSQueue childQueue : parent.getChildQueues()) { - childQueue.getWriteLock().lock(); - try { - for (String label : childQueue.getConfiguredNodeLabels()) { - ResourceVector aggregatedUsedResource = aggregatedResources.getOrDefault(label, - ResourceVector.newInstance()); - setChildResources(updateContext, childQueue, label, aggregatedUsedResource, capacityCalculator); - aggregatedResources.put(label, aggregatedUsedResource); + for (String resourceName : definedResources) { + for (ResourceUnitCapacityType capacityType : CALCULATOR_PRECEDENCE) { + Map effectiveResourceUsedByLabel = new HashMap<>(); + for (CSQueue childQueue : parent.getChildQueues()) { + childQueue.getWriteLock().lock(); + try { + for (String label : childQueue.getConfiguredNodeLabels()) { + QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label); + if (!childQueue.getConfiguredCapacityVector(label).isResourceOfType( + resourceName, capacityType)) { + continue; + } + float aggregatedUsedResource = effectiveResourceUsedByLabel.getOrDefault(label, + 0f); + float usedResourceByChild = setChildResources(updateContext, childQueue, label, + capacityVector.getResource(resourceName)); + float resourceUsedByLabel = aggregatedUsedResource + usedResourceByChild; + updateContext.getOrCreateQueueBranchContext(parent.getQueuePath()) + .getOverallRemainingResource(label).subtract(resourceName, usedResourceByChild); + effectiveResourceUsedByLabel.put(label, resourceUsedByLabel); + } + } finally { + childQueue.getWriteLock().unlock(); } - } finally { - childQueue.getWriteLock().unlock(); } - } - for (Map.Entry entry : aggregatedResources.entrySet()) { - updateContext.getQueueBranchContext(parent.getQueuePath()).getBatchRemainingResources( - entry.getKey()).subtract(entry.getValue()); + // Only decrement post calculator remaining resource at the end of each calculator phase + for (Map.Entry entry : effectiveResourceUsedByLabel.entrySet()) { + updateContext.getOrCreateQueueBranchContext(parent.getQueuePath()).getPostCalculatorRemainingResource( + entry.getKey()).subtract(resourceName, entry.getValue()); + } } } } - private void setChildResources( + private float setChildResources( QueueCapacityUpdateContext updateContext, CSQueue childQueue, String label, - ResourceVector usedResourcesOfCalculator, AbstractQueueCapacityCalculator capacityCalculator) { - for (String resourceName : capacityCalculator.getResourceNames(childQueue, label)) { - long clusterResource = updateContext.getUpdatedClusterResource(label).getResourceValue( - resourceName); - QueueCapacityVectorEntry capacityVectorEntry = childQueue.getConfiguredCapacityVector(label) - .getResource(resourceName); - QueueCapacityVectorEntry maximumCapacityVectorEntry = childQueue - .getConfiguredMaximumCapacityVector(label).getResource(resourceName); - AbstractQueueCapacityCalculator maximumCapacityCalculator = calculators.get( - maximumCapacityVectorEntry.getVectorResourceType()); - - float minimumResource = capacityCalculator.calculateMinimumResource( - updateContext, childQueue, label, capacityVectorEntry); - float maximumResource = maximumCapacityCalculator.calculateMaximumResource( - updateContext, childQueue, label, maximumCapacityVectorEntry); - - minimumResource = roundingStrategy.getRoundedResource(minimumResource, capacityVectorEntry); - maximumResource = roundingStrategy.getRoundedResource(maximumResource, - maximumCapacityVectorEntry); - Pair resources = validateCalculatedResources(updateContext, childQueue, resourceName, label, - usedResourcesOfCalculator, new ImmutablePair<>(minimumResource, maximumResource)); - minimumResource = resources.getLeft(); - maximumResource = resources.getRight(); - - float absoluteMinCapacity = minimumResource / clusterResource; - float absoluteMaxCapacity = maximumResource / clusterResource; - childQueue.getOrCreateAbsoluteMinCapacityVector(label).setValue( - resourceName, absoluteMinCapacity); - childQueue.getOrCreateAbsoluteMaxCapacityVector(label).setValue( - resourceName, absoluteMaxCapacity); - - childQueue.getQueueResourceQuotas().getEffectiveMinResource(label).setResourceValue( - resourceName, (long) minimumResource); - childQueue.getQueueResourceQuotas().getEffectiveMaxResource(label).setResourceValue( - resourceName, (long) maximumResource); - - usedResourcesOfCalculator.increment(resourceName, minimumResource); - } + QueueCapacityVectorEntry capacityVectorEntry) { + AbstractQueueCapacityCalculator capacityCalculator = calculators.get( + capacityVectorEntry.getVectorResourceType()); + String resourceName = capacityVectorEntry.getResourceName(); + long clusterResource = updateContext.getUpdatedClusterResource(label).getResourceValue( + resourceName); + QueueCapacityVectorEntry maximumCapacityVectorEntry = childQueue + .getConfiguredMaximumCapacityVector(label).getResource(resourceName); + AbstractQueueCapacityCalculator maximumCapacityCalculator = calculators.get( + maximumCapacityVectorEntry.getVectorResourceType()); + + float minimumResource = capacityCalculator.calculateMinimumResource( + updateContext, childQueue, label, capacityVectorEntry); + float maximumResource = maximumCapacityCalculator.calculateMaximumResource( + updateContext, childQueue, label, maximumCapacityVectorEntry); + + minimumResource = roundingStrategy.getRoundedResource(minimumResource, capacityVectorEntry); + maximumResource = roundingStrategy.getRoundedResource(maximumResource, + maximumCapacityVectorEntry); + Pair resources = validateCalculatedResources(updateContext, childQueue, + resourceName, label, new ImmutablePair<>(minimumResource, maximumResource)); + minimumResource = resources.getLeft(); + maximumResource = resources.getRight(); + + float absoluteMinCapacity = minimumResource / clusterResource; + float absoluteMaxCapacity = maximumResource / clusterResource; + childQueue.getOrCreateAbsoluteMinCapacityVector(label).setValue( + resourceName, absoluteMinCapacity); + childQueue.getOrCreateAbsoluteMaxCapacityVector(label).setValue( + resourceName, absoluteMaxCapacity); + + childQueue.getQueueResourceQuotas().getEffectiveMinResource(label).setResourceValue( + resourceName, (long) minimumResource); + childQueue.getQueueResourceQuotas().getEffectiveMaxResource(label).setResourceValue( + resourceName, (long) maximumResource); + + return minimumResource; } private Pair validateCalculatedResources( QueueCapacityUpdateContext updateContext, CSQueue childQueue, String resourceName, - String label, ResourceVector usedResourcesOfCalculator, Pair calculatedResources) { + String label, Pair calculatedResources) { CSQueue parentQueue = childQueue.getParent(); float minimumResource = calculatedResources.getLeft(); long minimumMemoryResource = childQueue.getQueueResourceQuotas().getEffectiveMinResource(label) .getMemorySize(); - float remainingResourceUnderParent = updateContext.getQueueBranchContext( - parentQueue.getQueuePath()).getBatchRemainingResources(label).getValue(resourceName) - - usedResourcesOfCalculator.getValue(resourceName); + float remainingResourceUnderParent = updateContext.getOrCreateQueueBranchContext( + parentQueue.getQueuePath()).getOverallRemainingResource(label).getValue(resourceName); long parentMaximumResource = parentQueue.getEffectiveMaxCapacity(label).getResourceValue( resourceName); float maximumResource = calculatedResources.getRight(); + // Memory is the primary resource, if its zero, all other resource units are zero as well. + if (!resourceName.equals(MEMORY_URI) && minimumMemoryResource == 0) { + minimumResource = 0; + } + if (maximumResource != 0 && maximumResource > parentMaximumResource) { updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT.ofQueue( childQueue.getQueuePath())); } - maximumResource = maximumResource == 0 ? parentMaximumResource - : Math.min(maximumResource, parentMaximumResource); + maximumResource = maximumResource == 0 ? parentMaximumResource : Math.min(maximumResource, + parentMaximumResource); if (maximumResource < minimumResource) { updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_EXCEEDS_MAX_RESOURCE.ofQueue( @@ -305,4 +292,50 @@ private Pair validateCalculatedResources( return new ImmutablePair<>(minimumResource, maximumResource); } + + private void updateChildrenAfterCalculation( + CSQueue parent, QueueCapacityUpdateContext updateContext, ResourceLimits resourceLimits) { + for (CSQueue childQueue : parent.getChildQueues()) { + updateChildCapacities(updateContext, childQueue); + ResourceLimits childLimit = ((ParentQueue) parent).getResourceLimitsOfChild( + childQueue, updateContext.getUpdatedClusterResource(), resourceLimits, NO_LABEL, false); + childQueue.refreshAfterResourceCalculation(updateContext.getUpdatedClusterResource(), + childLimit); + updateChildren(childQueue, updateContext, childLimit); + } + } + + private void updateChildCapacities( + QueueCapacityUpdateContext updateContext, CSQueue childQueue) { + for (String label : childQueue.getConfiguredNodeLabels()) { + QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label); + if (capacityVector.isMixedCapacityVector()) { + // Post update capacities based on the calculated effective resource values + AbstractQueueCapacityCalculator.setQueueCapacities(updateContext.getUpdatedClusterResource( + label), childQueue, label); + } else { + // Update capacities according to the legacy logic + for (ResourceUnitCapacityType capacityType : + childQueue.getConfiguredCapacityVector(label).getDefinedCapacityTypes()) { + AbstractQueueCapacityCalculator calculator = calculators.get(capacityType); + calculator.updateCapacitiesAfterCalculation(updateContext, childQueue, label); + } + } + } + } + + private void loadResourceNames() { + Set resources = new HashSet<>(ResourceUtils.getResourceTypes().keySet()); + if (resources.contains(MEMORY_URI)) { + resources.remove(MEMORY_URI); + definedResources.add(MEMORY_URI); + } + + if (resources.contains(VCORES_URI)) { + resources.remove(VCORES_URI); + definedResources.add(VCORES_URI); + } + + definedResources.addAll(resources); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PercentageQueueCapacityCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PercentageQueueCapacityCalculator.java index a27899c6625b3..832f6872019d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PercentageQueueCapacityCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PercentageQueueCapacityCalculator.java @@ -32,8 +32,8 @@ public float calculateMinimumResource( float parentAbsoluteCapacity = parentQueue.getOrCreateAbsoluteMinCapacityVector(label).getValue( resourceName); - float remainingPerEffectiveResourceRatio = updateContext.getQueueBranchContext( - parentQueue.getQueuePath()).getBatchRemainingResources(label) + float remainingPerEffectiveResourceRatio = updateContext.getOrCreateQueueBranchContext( + parentQueue.getQueuePath()).getPostCalculatorRemainingResource(label) .getValue(resourceName) / parentQueue.getEffectiveCapacity(label) .getResourceValue(resourceName); float absoluteCapacity = parentAbsoluteCapacity * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueBranchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueBranchContext.java index 4e1e3d7fb97e8..70c6a0d331b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueBranchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueBranchContext.java @@ -28,6 +28,7 @@ public class QueueBranchContext { private final Map remainingResourceByLabel = new HashMap<>(); private final Map normalizedResourceRatio = new HashMap<>(); + private final Map overallRemainingResource = new HashMap<>(); private final Map> sumWeightsPerLabel = new HashMap<>(); /** @@ -53,13 +54,13 @@ public float getSumWeightsByResource(String label, String resourceName) { } /** - * Sets the overall remaining resource under a parent that is available for its children to + * Sets the remaining resource under a parent that is available for its children to * occupy. * * @param label node label * @param resource resource vector */ - public void setBatchRemainingResource(String label, ResourceVector resource) { + public void setPostCalculatorRemainingResource(String label, ResourceVector resource) { remainingResourceByLabel.put(label, resource); } @@ -67,14 +68,38 @@ public Map getNormalizedResourceRatios() { return normalizedResourceRatio; } + /** + * Returns the remaining resources of a parent that is still available for its + * children. + * + * @param label node label + * @return remaining resources + */ + public ResourceVector getOverallRemainingResource(String label) { + overallRemainingResource.putIfAbsent(label, ResourceVector.newInstance()); + return overallRemainingResource.get(label); + } + + /** + * Sets the remaining resources of a parent that is still available for its children. + * + * @param label node label + * @param resourceVector resource vector + */ + public void setOverallRemainingResource(String label, ResourceVector resourceVector) { + overallRemainingResource.put(label, resourceVector); + } + /** * Returns the remaining resources of a parent that is still available for its * children. Decremented only after the calculator is finished its work on the corresponding * resources. + * * @param label node label * @return remaining resources */ - public ResourceVector getBatchRemainingResources(String label) { - return remainingResourceByLabel.getOrDefault(label, ResourceVector.newInstance()); + public ResourceVector getPostCalculatorRemainingResource(String label) { + remainingResourceByLabel.putIfAbsent(label, ResourceVector.newInstance()); + return remainingResourceByLabel.get(label); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityUpdateContext.java index 21de811f1c298..ee3c4e49e29ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityUpdateContext.java @@ -33,13 +33,10 @@ */ public class QueueCapacityUpdateContext { private final Resource updatedClusterResource; - - private final Map queueBranchContext - = LazyMap.decorate(new HashMap(), - QueueBranchContext::new); private final RMNodeLabelsManager labelsManager; - private List warnings = new ArrayList(); + private final Map queueBranchContext = new HashMap<>(); + private final List warnings = new ArrayList(); public QueueCapacityUpdateContext(Resource updatedClusterResource, RMNodeLabelsManager labelsManager) { @@ -69,7 +66,8 @@ public Resource getUpdatedClusterResource() { * @param queuePath queue path of the parent * @return queue branch context */ - public QueueBranchContext getQueueBranchContext(String queuePath) { + public QueueBranchContext getOrCreateQueueBranchContext(String queuePath) { + queueBranchContext.putIfAbsent(queuePath, new QueueBranchContext()); return queueBranchContext.get(queuePath); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java index 5f6e769b2dfaa..6011d4d686de5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java @@ -91,25 +91,21 @@ public void subtract(ResourceVector otherResourceVector) { } /** - * Increments the given resource by the specified value. + * Subtracts the given resource by the specified value. * @param resourceName name of the resource - * @param value value to be added to the resource's current value + * @param value value to be subtracted from the resource's current value */ - public void increment(String resourceName, float value) { - setValue(resourceName, getValue(resourceName) + value); + public void subtract(String resourceName, float value) { + setValue(resourceName, getValue(resourceName) - value); } /** - * Gets the average of all resource unit values. - * @return average of resource unit values + * Increments the given resource by the specified value. + * @param resourceName name of the resource + * @param value value to be added to the resource's current value */ - public float getAverageValue() { - return (float) resourcesByName.values().stream().mapToDouble(value -> value).average() - .orElse(0); - } - - public float getMaxValue() { - return resourcesByName.values().stream().max(Comparator.naturalOrder()).orElse(0f); + public void increment(String resourceName, float value) { + setValue(resourceName, getValue(resourceName) + value); } public float getValue(String resourceName) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WeightQueueCapacityCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WeightQueueCapacityCalculator.java index 3294c69903a2c..3e77a28637bbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WeightQueueCapacityCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WeightQueueCapacityCalculator.java @@ -36,7 +36,7 @@ public void calculateResourcePrerequisites( for (String label : childQueue.getConfiguredNodeLabels()) { for (String resourceName : childQueue.getConfiguredCapacityVector(label) .getResourceNamesByCapacityType(getCapacityType())) { - updateContext.getQueueBranchContext(parentQueue.getQueuePath()) + updateContext.getOrCreateQueueBranchContext(parentQueue.getQueuePath()) .incrementWeight(label, resourceName, childQueue.getConfiguredCapacityVector(label) .getResource(resourceName).getResourceValue()); } @@ -51,11 +51,11 @@ public float calculateMinimumResource( CSQueue parentQueue = childQueue.getParent(); String resourceName = capacityVectorEntry.getResourceName(); float normalizedWeight = capacityVectorEntry.getResourceValue() - / updateContext.getQueueBranchContext(parentQueue.getQueuePath()) + / updateContext.getOrCreateQueueBranchContext(parentQueue.getQueuePath()) .getSumWeightsByResource(label, resourceName); - float remainingResource = updateContext.getQueueBranchContext( - parentQueue.getQueuePath()).getBatchRemainingResources(label) + float remainingResource = updateContext.getOrCreateQueueBranchContext( + parentQueue.getQueuePath()).getPostCalculatorRemainingResource(label) .getValue(resourceName); // Due to rounding loss it is better to use all remaining resources if no other resource uses @@ -101,7 +101,7 @@ public void updateCapacitiesAfterCalculation( Collection resourceNames = getResourceNames(queue, label); for (String resourceName : resourceNames) { - float sumBranchWeight = updateContext.getQueueBranchContext(queue.getParent().getQueuePath()) + float sumBranchWeight = updateContext.getOrCreateQueueBranchContext(queue.getParent().getQueuePath()) .getSumWeightsByResource(label, resourceName); float capacity = queue.getConfiguredCapacityVector(label).getResource( resourceName).getResourceValue() / sumBranchWeight; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMixedQueueResourceCalculation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMixedQueueResourceCalculation.java index d42f9a614f9a3..cd5d3babc38c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMixedQueueResourceCalculation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMixedQueueResourceCalculation.java @@ -47,11 +47,11 @@ public class TestMixedQueueResourceCalculation extends CapacitySchedulerQueueCal public static final Resource B1_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(8095, 3); public static final Resource C_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(5803, 4); - public static final Resource B_WARNING_RESOURCE = Resource.newInstance(8096, 3); + public static final Resource B_WARNING_RESOURCE = Resource.newInstance(8096, 4); public static final Resource B1_WARNING_RESOURCE = Resource.newInstance(8096, 3); - public static final Resource A_WARNING_RESOURCE = Resource.newInstance(8288, 9); + public static final Resource A_WARNING_RESOURCE = Resource.newInstance(8288, 12); public static final Resource A1_WARNING_RESOURCE = Resource.newInstance(2048, 4); - public static final Resource A2_WARNING_RESOURCE = Resource.newInstance(2048, 5); + public static final Resource A2_WARNING_RESOURCE = Resource.newInstance(2048, 8); public static final Resource A12_WARNING_RESOURCE = Resource.newInstance(2048, 4); @Override @@ -165,20 +165,18 @@ public void testComplexHierarchyWithWarnings() throws IOException { updateContext.getUpdateWarnings(), QueueUpdateWarningType.BRANCH_DOWNSCALED, B); Optional queueA11ZeroResourceWarning = getSpecificWarning( updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, A11); - Optional queueA12ZeroResourceWarning = getSpecificWarning( - updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, A12); Assert.assertTrue(queueCZeroResourceWarning.isPresent()); Assert.assertTrue(queueARemainingResourceWarning.isPresent()); Assert.assertTrue(queueBDownscalingWarning.isPresent()); Assert.assertTrue(queueA11ZeroResourceWarning.isPresent()); - Assert.assertTrue(queueA12ZeroResourceWarning.isPresent()); } @Test public void testZeroResourceIfNoMemory() throws IOException { csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(100), weight(6))); - csConf.setCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY), absolute(VCORES * 0.5))); + csConf.setCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY), + absolute(VCORES * 0.5))); QueueAssertionBuilder assertionBuilder = createAssertionBuilder() .withQueue(A) @@ -200,10 +198,13 @@ public void testZeroResourceIfNoMemory() throws IOException { @Test public void testDifferentMinimumAndMaximumCapacityTypes() throws IOException { - csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(50), absolute(VCORES * 0.5))); - csConf.setMaximumCapacityVector(A, "", createMemoryVcoresVector(absolute(MEMORY), percentage(80))); + csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(50), + absolute(VCORES * 0.5))); + csConf.setMaximumCapacityVector(A, "", createMemoryVcoresVector(absolute(MEMORY), + percentage(80))); csConf.setCapacityVector(B, "", createMemoryVcoresVector(weight(6), percentage(100))); - csConf.setMaximumCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY), absolute(VCORES * 0.5))); + csConf.setMaximumCapacityVector(B, "", createMemoryVcoresVector(absolute(MEMORY), + absolute(VCORES * 0.5))); QueueAssertionBuilder assertionBuilder = createAssertionBuilder() .withQueue(A) @@ -226,17 +227,20 @@ public void testDifferentMinimumAndMaximumCapacityTypes() throws IOException { try { cs.reinitialize(csConf, mockRM.getRMContext()); update(assertionBuilder, UPDATE_RESOURCE); - Assert.fail("WEIGHT maximum capacity type is not supported, an error should be thrown when set up"); + Assert.fail("WEIGHT maximum capacity type is not supported, an error should be thrown when " + + "set up"); } catch (IllegalStateException ignored) { } } @Test public void testMaximumResourceWarnings() throws IOException { - csConf.setMaximumCapacityVector(A1, "", createMemoryVcoresVector(absolute(MEMORY * 0.5), percentage(100))); + csConf.setMaximumCapacityVector(A1, "", createMemoryVcoresVector(absolute(MEMORY * 0.5), + percentage(100))); csConf.setCapacityVector(A11, "", createMemoryVcoresVector(percentage(50), percentage(100))); csConf.setCapacityVector(A12, "", createMemoryVcoresVector(percentage(50), percentage(0))); - csConf.setMaximumCapacityVector(A11, "", createMemoryVcoresVector(absolute(MEMORY), percentage(10))); + csConf.setMaximumCapacityVector(A11, "", createMemoryVcoresVector(absolute(MEMORY), + percentage(10))); QueueAssertionBuilder assertionBuilder = createAssertionBuilder() .withQueue(A11) @@ -253,7 +257,8 @@ public void testMaximumResourceWarnings() throws IOException { QueueCapacityUpdateContext updateContext = update(assertionBuilder, UPDATE_RESOURCE); Optional queueA11ExceedsParentMaxResourceWarning = getSpecificWarning( - updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT, A11); + updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT, + A11); Optional queueA11MinExceedsMaxWarning = getSpecificWarning( updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_EXCEEDS_MAX_RESOURCE, A11); Assert.assertTrue(queueA11ExceedsParentMaxResourceWarning.isPresent()); @@ -288,7 +293,8 @@ private void setupQueueHierarchyWithWarnings() throws IOException { csConf.setState(B, QueueState.RUNNING); csConf.setCapacityVector(A, "", createMemoryVcoresVector(percentage(100), weight(6))); - csConf.setCapacityVector(A1, "", createMemoryVcoresVector(absolute(2048), absolute(VCORES * 0.25))); + csConf.setCapacityVector(A1, "", createMemoryVcoresVector(absolute(2048), + absolute(VCORES * 0.25))); csConf.setCapacityVector(A11, "", createMemoryVcoresVector(weight(1), absolute(VCORES * 0.25))); csConf.setCapacityVector(A12, "", createMemoryVcoresVector(percentage(100), percentage(100))); csConf.setCapacityVector(A2, "", createMemoryVcoresVector(absolute(2048), percentage(100))); @@ -308,7 +314,7 @@ private void setQueues() { private Optional getSpecificWarning( Collection warnings, QueueUpdateWarningType warningTypeToSelect, String queue) { - return warnings.stream().filter((w) -> w.getWarningType().equals(warningTypeToSelect) && w.getQueue().equals( - queue)).findFirst(); + return warnings.stream().filter((w) -> w.getWarningType().equals(warningTypeToSelect) + && w.getQueue().equals(queue)).findFirst(); } }