Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
avoid unnecessary preemption for tiny queues under certain corner cases
Browse files Browse the repository at this point in the history
Jian Chen committed Mar 26, 2022

Verified

This commit was signed with the committer’s verified signature.
fiji-flo Florian Dieminger
1 parent da09d68 commit 27ab492
Showing 2 changed files with 155 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -31,12 +31,16 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
*/
public class AbstractPreemptableResourceCalculator {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractPreemptableResourceCalculator.class);

protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
@@ -76,6 +80,34 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
}
}

private static class NormalizationTuple {
private Resource numerator;
private Resource denominator;

NormalizationTuple(Resource numer, Resource denom) {
this.numerator = numer;
this.denominator = denom;
}

long getNumeratorValue(int i) {
return numerator.getResourceInformation(i).getValue();
}

long getDenominatorValue(int i) {
String nUnits = numerator.getResourceInformation(i).getUnits();
ResourceInformation dResourceInformation = denominator
.getResourceInformation(i);
return UnitsConversionUtil.convert(
dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue());
}

float getNormalizedValue(int i) {
long nValue = getNumeratorValue(i);
long dValue = getDenominatorValue(i);
return dValue == 0 ? 0.0f : (float) nValue / dValue;
}
}

/**
* PreemptableResourceCalculator constructor.
*
@@ -175,7 +207,7 @@ protected void computeFixpointAllocation(Resource totGuarant,
unassigned, Resources.none())) {
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
resetCapacity(orderedByNeed, ignoreGuarantee);

// For each underserved queue (or set of queues if multiple are equally
// underserved), offer its share of the unassigned resources based on its
@@ -252,47 +284,145 @@ protected void initIdealAssignment(Resource totGuarant,
/**
* Computes a normalizedGuaranteed capacity based on active queues.
*
* @param clusterResource
* the total amount of resources in the cluster
* @param queues
* the list of queues to consider
* @param ignoreGuar
* ignore guarantee.
*/
private void resetCapacity(Resource clusterResource,
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
private void resetCapacity(Collection<TempQueuePerPartition> queues,
boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
float activeTotalAbsCap = 0.0f;
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();

if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
for (int i = 0; i < maxLength; i++) {
q.normalizedGuarantee[i] = 1.0f / queues.size();
for (int i = 0; i < maxLength; i++) {
for (TempQueuePerPartition q : queues) {
computeNormGuarEvenly(q, queues.size(), i);
}
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
activeTotalAbsCap += q.getAbsCapacity();
}
for (TempQueuePerPartition q : queues) {
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation = q.getGuaranteed()
.getResourceInformation(i);
ResourceInformation dResourceInformation = activeCap
.getResourceInformation(i);

long nValue = nResourceInformation.getValue();
long dValue = UnitsConversionUtil.convert(
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
dResourceInformation.getValue());
if (dValue != 0) {
q.normalizedGuarantee[i] = (float) nValue / dValue;

// loop through all resource types and normalize guaranteed capacity for all queues
for (int i = 0; i < maxLength; i++) {
boolean useAbsCapBasedNorm = false;
// if the sum of absolute capacity of all queues involved is 0,
// we should normalize evenly
boolean useEvenlyDistNorm = activeTotalAbsCap == 0;

// loop through all the queues once to determine the
// right normalization strategy for current processing resource type
for (TempQueuePerPartition q : queues) {
NormalizationTuple normTuple = new NormalizationTuple(
q.getGuaranteed(), activeCap);
long queueGuaranValue = normTuple.getNumeratorValue(i);
long totalActiveGuaranValue = normTuple.getDenominatorValue(i);

if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) {
// when the rounded value of a resource type is 0 but its absolute capacity is not 0,
// we should consider taking the normalized guarantee based on absolute capacity
useAbsCapBasedNorm = true;
break;
}

if (totalActiveGuaranValue == 0) {
// If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity
// of this resource dimension for all active queues is tiny (close to 0).
// For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48,
// then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then
// get rounded/casted into 0 (double -> long)
// In this scenario where the denominator is 0, we can just spread resources across
// all tiny queues evenly since their absoluteCapacity are roughly the same
useEvenlyDistNorm = true;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Queue normalization strategy: " +
"absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm +
"), evenlyDistributedNormalization(" + useEvenlyDistNorm +
"), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")");
}

// loop through all the queues again to apply normalization strategy
for (TempQueuePerPartition q : queues) {
if (useAbsCapBasedNorm) {
computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i);
} else if (useEvenlyDistNorm) {
computeNormGuarEvenly(q, queues.size(), i);
} else {
computeDefaultNormGuar(q, activeCap, i);
}
}
}
}
}

/**
* Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity
*
* Example:
* There are two active queues: queueA & queueB, and
* their configured absolute minimum capacity is 1% and 3% respectively.
*
* Then their normalized guaranteed capacity are:
* normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25
* normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75
*
* @param q
* the queue to consider
* @param activeTotalAbsCap
* the sum of absolute capacity of all active queues
* @param resourceTypeIdx
* index of the processing resource type
*/
private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q,
float activeTotalAbsCap,
int resourceTypeIdx) {
if (activeTotalAbsCap != 0) {
q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap;
}
}
/**
* Computes the normalized guaranteed capacity evenly based on num of active queues
*
* @param q
* the queue to consider
* @param numOfActiveQueues
* number of active queues
* @param resourceTypeIdx
* index of the processing resource type
*/
private static void computeNormGuarEvenly(TempQueuePerPartition q,
int numOfActiveQueues,
int resourceTypeIdx) {
q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues;
}

/**
* The default way to compute a queue's normalized guaranteed capacity
*
* For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by
* the total amount of guaranteed resource of all active queues
*
* @param q
* the queue to consider
* @param activeCap
* total guaranteed resources of all active queues
* @param resourceTypeIdx
* index of the processing resource type
*/
private static void computeDefaultNormGuar(TempQueuePerPartition q,
Resource activeCap,
int resourceTypeIdx) {
NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap);
q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx);
}

// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
Original file line number Diff line number Diff line change
@@ -201,6 +201,10 @@ Resource offer(Resource avail, ResourceCalculator rc,
return remain;
}

public float getAbsCapacity() {
return absCapacity;
}

public Resource getGuaranteed() {
if(!effMinRes.equals(Resources.none())) {
return Resources.clone(effMinRes);

0 comments on commit 27ab492

Please sign in to comment.