Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-10965. Centralize queue resource calculation based on CapacityVectors #3470

Closed
wants to merge 33 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
86b32bd
YARN-10930. Introduce universal capacity resource vector
9uapaw Aug 31, 2021
b935b29
YARN-10930. Cleanup for QueueCapacityVector
9uapaw Sep 21, 2021
5f3c0da
YARN-10965. Introduce enhanced queue calculation
9uapaw Sep 23, 2021
6ee8dc2
YARN-10930. Cover additional test cases
9uapaw Oct 14, 2021
ca8016c
YARN-10965. Extend
9uapaw Oct 14, 2021
4fb353c
YARN-10965. Extend 2
9uapaw Oct 19, 2021
8df3901
YARN-10930. Fix review feedbacks
9uapaw Oct 20, 2021
8c6559f
YARN-10965. Simplify calculation
9uapaw Oct 21, 2021
0b7adc8
YARN-10965. Simplify calculators
9uapaw Oct 29, 2021
2bcb962
Merge remote-tracking branch 'origin/trunk' into YARN-10965
9uapaw Oct 29, 2021
83aabd0
YARN-10965. Simplify logic in QueueHandler
9uapaw Nov 15, 2021
3dae1dc
YARN-10965. Implement strict resource iteration order
9uapaw Nov 16, 2021
e549acc
YARN-10965. Introduce driver concept to simplify the logic and encaps…
9uapaw Nov 18, 2021
6c629ea
YARN-10965. Fix root driver child queue setting
9uapaw Nov 18, 2021
27e6344
YARN-10965. Fix warnings and nits
9uapaw Nov 22, 2021
e6609f2
YARN-10965. Make Absolute calculator use remaining resource ratio as …
9uapaw Nov 22, 2021
784c1be
YARN-10965. Create node label test
9uapaw Nov 23, 2021
0793322
YARN-10965. Make rounding strategies more flexible with regards to ca…
9uapaw Nov 30, 2021
75bca46
YARN-10965. Simplify test API
9uapaw Dec 9, 2021
513555c
YARN-10965. Fix review feedbacks
9uapaw Dec 9, 2021
01c3d5b
Merge branch 'trunk' into YARN-10965
9uapaw Dec 15, 2021
ec56a1c
YARN-10965. Fix checkstyle issues
9uapaw Dec 15, 2021
4deb72a
YARN-10965. Introduce calculation iteration context
9uapaw Feb 1, 2022
044cf3b
YARN-10965. Fix review feedbacks
9uapaw Feb 2, 2022
c648a82
Merge branch 'trunk' into YARN-10965
9uapaw Feb 2, 2022
faa9a9a
YARN-10965. Remove unnecessary change in LeafQueue
9uapaw Feb 2, 2022
c16c335
YARN-10965. Fix review feedback
9uapaw Feb 17, 2022
415342d
Merge branch 'trunk' into YARN-10965
9uapaw Mar 22, 2022
785f2fa
YARN-10965. Fix checkstyle issues and nomenclature nits
9uapaw Mar 22, 2022
edbf3da
Merge branch 'trunk' into YARN-10965
9uapaw Aug 30, 2022
673a8c0
YARN-10965. Fix javac and spotbugs errors
9uapaw Aug 30, 2022
2958698
Merge branch 'apache:trunk' into YARN-10965
9uapaw Jan 25, 2023
ce2618d
Fix checkstyle
szilard-nemeth Jan 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
YARN-10965. Fix review feedbacks
9uapaw committed Feb 2, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 044cf3b72c87f84c3051c56c45a37016252e2a52
Original file line number Diff line number Diff line change
@@ -818,7 +818,7 @@ public static Resource createResourceWithSameValue(long value) {
return res;
}

public static Resource multiply(Resource resource, float multiplier) {
public static Resource multiplyFloor(Resource resource, double multiplier) {
Resource newResource = Resource.newInstance(0, 0);

for (ResourceInformation resourceInformation : resource.getResources()) {
@@ -829,7 +829,7 @@ public static Resource multiply(Resource resource, float multiplier) {
return newResource;
}

public static Resource multiplyRound(Resource resource, float multiplier) {
public static Resource multiplyRound(Resource resource, double multiplier) {
Resource newResource = Resource.newInstance(0, 0);

for (ResourceInformation resourceInformation : resource.getResources()) {
Original file line number Diff line number Diff line change
@@ -19,33 +19,33 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ResourceCalculationDriver.CalculationContext;

public class AbsoluteResourceCapacityCalculator extends AbstractQueueCapacityCalculator {
szilard-nemeth marked this conversation as resolved.
Show resolved Hide resolved

@Override
public float calculateMinimumResource(
public double calculateMinimumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context, String label) {
String resourceName = context.getResourceName();
float normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
double normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
label, ResourceVector.of(1)).getValue(resourceName);
float remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
label, resourceName);

return normalizedRatio * remainingResourceRatio * context.getCurrentMinimumCapacityEntry(
label).getResourceValue();
}

@Override
public float calculateMaximumResource(
public double calculateMaximumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context, String label) {
return context.getCurrentMaximumCapacityEntry(label).getResourceValue();
}

@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
resourceCalculationDriver.setQueueCapacities(queue, label);
CapacitySchedulerQueueCapacityHandler.setQueueCapacities(
resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label), queue, label);
}

@Override
Original file line number Diff line number Diff line change
@@ -200,20 +200,6 @@ public float getAbsoluteCapacity() {
return queueCapacities.getAbsoluteCapacity();
}

@Override
public ResourceVector getOrCreateAbsoluteMinCapacityVector(String label) {
usageTracker.getAbsoluteMinCapacityVector().putIfAbsent(label, ResourceVector.newInstance());

return usageTracker.getAbsoluteMinCapacityVector().get(label);
}

@Override
public ResourceVector getOrCreateAbsoluteMaxCapacityVector(String label) {
usageTracker.getAbsoluteMaxCapacityVector().putIfAbsent(label, ResourceVector.newInstance());

return usageTracker.getAbsoluteMaxCapacityVector().get(label);
}

@Override
public float getAbsoluteMaximumCapacity() {
return queueCapacities.getAbsoluteMaximumCapacity();
Original file line number Diff line number Diff line change
@@ -19,16 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ResourceCalculationDriver.CalculationContext;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;

import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;

/**
* A strategy class to encapsulate queue capacity setup and resource calculation
* logic.
@@ -61,7 +53,7 @@ public abstract void updateCapacitiesAfterCalculation(
* @param label node label
* @return minimum effective resource
*/
public abstract float calculateMinimumResource(ResourceCalculationDriver resourceCalculationDriver,
public abstract double calculateMinimumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context,
String label);

@@ -73,7 +65,7 @@ public abstract float calculateMinimumResource(ResourceCalculationDriver resourc
* @param label node label
* @return minimum effective resource
*/
public abstract float calculateMaximumResource(ResourceCalculationDriver resourceCalculationDriver,
public abstract double calculateMaximumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context,
String label);

@@ -84,12 +76,6 @@ public abstract float calculateMaximumResource(ResourceCalculationDriver resourc
* calculation should be made
*/
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
9uapaw marked this conversation as resolved.
Show resolved Hide resolved
for (String label : resourceCalculationDriver.getParent().getConfiguredNodeLabels()) {
// We need to set normalized resource ratio only once per parent
if (resourceCalculationDriver.getNormalizedResourceRatios().isEmpty()) {
setNormalizedResourceRatio(resourceCalculationDriver, label);
}
}
}

/**
@@ -117,71 +103,4 @@ protected Set<String> getResourceNames(CSQueue queue, String label,
return queue.getConfiguredCapacityVector(label)
.getResourceNamesByCapacityType(capacityType);
}

/**
* Calculates the normalized resource ratio of a parent queue, under which children are defined
* with absolute capacity type. If the effective resource of the parent is less, than the
* aggregated configured absolute resource of its children, the resource ratio will be less,
* than 1.
*
* @param label node label
*/
private void setNormalizedResourceRatio(
ResourceCalculationDriver resourceCalculationDriver, String label) {
// ManagedParents assign zero capacity to queues in case of overutilization, downscaling is
// turned off for their children
CSQueue parentQueue = resourceCalculationDriver.getParent();
QueueCapacityUpdateContext updateContext = resourceCalculationDriver.getUpdateContext();

if (parentQueue instanceof ManagedParentQueue) {
return;
}

for (QueueCapacityVectorEntry capacityVectorEntry : parentQueue.getConfiguredCapacityVector(
label)) {
String resourceName = capacityVectorEntry.getResourceName();
long childrenConfiguredResource = 0;
long effectiveMinResource = parentQueue.getQueueResourceQuotas().getEffectiveMinResource(
label).getResourceValue(resourceName);

// Total configured min resources of direct children of this given parent
// queue
for (CSQueue childQueue : parentQueue.getChildQueues()) {
if (!childQueue.getConfiguredNodeLabels().contains(label)) {
continue;
}
QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label);
if (capacityVector.isResourceOfType(resourceName, ResourceUnitCapacityType.ABSOLUTE)) {
childrenConfiguredResource += capacityVector.getResource(resourceName).getResourceValue();
}
}
// If no children is using ABSOLUTE capacity type, normalization is not needed
if (childrenConfiguredResource == 0) {
continue;
}
// Factor to scale down effective resource: When cluster has sufficient
// resources, effective_min_resources will be same as configured
// min_resources.
float numeratorForMinRatio = childrenConfiguredResource;
if (effectiveMinResource < childrenConfiguredResource) {
numeratorForMinRatio = parentQueue.getQueueResourceQuotas().getEffectiveMinResource(label)
.getResourceValue(resourceName);
updateContext.addUpdateWarning(QueueUpdateWarningType.BRANCH_DOWNSCALED.ofQueue(
parentQueue.getQueuePath()));
}

String unit = resourceName.equals(MEMORY_URI) ? ResourceCalculationDriver.MB_UNIT : "";
long convertedValue = UnitsConversionUtil.convert(unit,
updateContext.getUpdatedClusterResource(label).getResourceInformation(resourceName)
.getUnits(), childrenConfiguredResource);

if (convertedValue != 0) {
Map<String, ResourceVector> normalizedResourceRatios = resourceCalculationDriver
.getNormalizedResourceRatios();
normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance());
normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio /
convertedValue);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -115,10 +115,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
*/
public float getAbsoluteCapacity();

public ResourceVector getOrCreateAbsoluteMinCapacityVector(String label);

public ResourceVector getOrCreateAbsoluteMaxCapacityVector(String label);

/**
* Get the configured maximum-capacity of the queue.
* @return the configured maximum-capacity of the queue
Original file line number Diff line number Diff line change
@@ -41,9 +41,6 @@ public class CSQueueUsageTracker {

private final QueueResourceQuotas queueResourceQuotas;

private final Map<String, ResourceVector> absoluteMinCapacityVector = new HashMap<>();
private final Map<String, ResourceVector> absoluteMaxCapacityVector = new HashMap<>();

public CSQueueUsageTracker(CSQueueMetrics metrics) {
this.metrics = metrics;
this.queueUsage = new ResourceUsage();
@@ -82,11 +79,4 @@ public QueueResourceQuotas getQueueResourceQuotas() {
return queueResourceQuotas;
}

public Map<String, ResourceVector> getAbsoluteMinCapacityVector() {
return absoluteMinCapacityVector;
}

public Map<String, ResourceVector> getAbsoluteMaxCapacityVector() {
return absoluteMaxCapacityVector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;

/**
* A storage class that wraps arguments used in a resource calculation iteration.
*/
public class CalculationContext {
private final String resourceName;
private final QueueCapacityVector.ResourceUnitCapacityType capacityType;
private final CSQueue childQueue;
9uapaw marked this conversation as resolved.
Show resolved Hide resolved

public CalculationContext(String resourceName, QueueCapacityVector.ResourceUnitCapacityType capacityType, CSQueue childQueue) {
this.resourceName = resourceName;
this.capacityType = capacityType;
this.childQueue = childQueue;
}

public String getResourceName() {
return resourceName;
}

public QueueCapacityVector.ResourceUnitCapacityType getCapacityType() {
return capacityType;
}

public CSQueue getChildQueue() {
return childQueue;
}

/**
* A shorthand to return the minimum capacity vector entry for the currently evaluated child and
* resource name.
*
* @param label node label
* @return capacity vector entry
*/
public QueueCapacityVectorEntry getCurrentMinimumCapacityEntry(String label) {
return childQueue.getConfiguredCapacityVector(label).getResource(resourceName);
}

/**
* A shorthand to return the maximum capacity vector entry for the currently evaluated child and
* resource name.
*
* @param label node label
* @return capacity vector entry
*/
public QueueCapacityVectorEntry getCurrentMaximumCapacityEntry(String label) {
return childQueue.getConfiguredMaxCapacityVector(label).getResource(resourceName);
}
}
Loading