Skip to content

Commit

Permalink
Make Adjusted Fair Share Constraint Aware (#3798)
Browse files Browse the repository at this point in the history
* implement cap

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 and d80tb7 authored Jul 19, 2024
1 parent 1fd9a0a commit 70039be
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 42 deletions.
49 changes: 30 additions & 19 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package constraints

import (
"fmt"
"math"

"github.com/pkg/errors"
Expand Down Expand Up @@ -193,40 +192,52 @@ func (constraints *SchedulingConstraints) CheckConstraints(
}

// queueSchedulingConstraintsByQueueName / priorityClassSchedulingConstraintsByPriorityClassName checks.
queueAndPriorityClassResourceLimits := constraints.getQueueAndPriorityClassResourceLimits(gctx)
priorityClassResourceLimits := constraints.getPriorityClassResourceLimits(gctx)
overallResourceLimits := util.MergeMaps(priorityClassResourceLimits, queueAndPriorityClassResourceLimits)
overallResourceLimits := constraints.resolveResourceLimitsForQueueAndPriorityClass(gctx.Queue, gctx.PriorityClassName)
if !isStrictlyLessOrEqual(qctx.AllocatedByPriorityClass[gctx.PriorityClassName].Resources, overallResourceLimits) {
return false, UnschedulableReasonMaximumResourcesExceeded, nil
}

return true, "", nil
}

func (constraints *SchedulingConstraints) getQueueAndPriorityClassResourceLimits(gctx *schedulercontext.GangSchedulingContext) map[string]resource.Quantity {
if queueConstraint, ok := constraints.queueSchedulingConstraintsByQueueName[gctx.Queue]; ok {
if priorityClassConstraint, ok := queueConstraint.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
return priorityClassConstraint.MaximumResourcesPerQueue
func (constraints *SchedulingConstraints) CapResources(queue string, resourcesByPc schedulerobjects.QuantityByTAndResourceType[string]) schedulerobjects.QuantityByTAndResourceType[string] {
cappedResourcesByPc := schedulerobjects.QuantityByTAndResourceType[string]{}
for pc, resources := range resourcesByPc {
overallResourceLimits := constraints.resolveResourceLimitsForQueueAndPriorityClass(queue, pc)
cappedResources := make(map[string]resource.Quantity, len(resources.Resources))
for resourceName, qty := range resources.Resources {
limit, ok := overallResourceLimits[resourceName]
if ok && qty.Cmp(limit) == 1 {
cappedResources[resourceName] = limit
} else {
cappedResources[resourceName] = qty
}
}
cappedResourcesByPc[pc] = schedulerobjects.ResourceList{Resources: cappedResources}
}
return map[string]resource.Quantity{}
return cappedResourcesByPc
}

func (constraints *SchedulingConstraints) getPriorityClassResourceLimits(gctx *schedulercontext.GangSchedulingContext) map[string]resource.Quantity {
if priorityClassConstraint, ok := constraints.priorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
return priorityClassConstraint.MaximumResourcesPerQueue
func (constraints *SchedulingConstraints) resolveResourceLimitsForQueueAndPriorityClass(queue string, priorityClass string) map[string]resource.Quantity {
queueAndPriorityClassResourceLimits := constraints.getQueueAndPriorityClassResourceLimits(queue, priorityClass)
priorityClassResourceLimits := constraints.getPriorityClassResourceLimits(priorityClass)
return util.MergeMaps(priorityClassResourceLimits, queueAndPriorityClassResourceLimits)
}

func (constraints *SchedulingConstraints) getQueueAndPriorityClassResourceLimits(queue string, priorityClass string) map[string]resource.Quantity {
if queueConstraint, ok := constraints.queueSchedulingConstraintsByQueueName[queue]; ok {
if priorityClassConstraint, ok := queueConstraint.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClass]; ok {
return priorityClassConstraint.MaximumResourcesPerQueue
}
}
return map[string]resource.Quantity{}
}

func RequestsAreLargeEnough(totalResourceRequests, minRequest map[string]resource.Quantity) (bool, string) {
for t, minQuantity := range minRequest {
q := totalResourceRequests[t]
if minQuantity.Cmp(q) == 1 {
return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String())
}
func (constraints *SchedulingConstraints) getPriorityClassResourceLimits(priorityClass string) map[string]resource.Quantity {
if priorityClassConstraint, ok := constraints.priorityClassSchedulingConstraintsByPriorityClassName[priorityClass]; ok {
return priorityClassConstraint.MaximumResourcesPerQueue
}
return true, ""
return map[string]resource.Quantity{}
}

func (constraints *SchedulingConstraints) GetMaxQueueLookBack() uint {
Expand Down
143 changes: 143 additions & 0 deletions internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,149 @@ func TestConstraints(t *testing.T) {
}
}

func TestCapResources(t *testing.T) {
tests := map[string]struct {
constraints SchedulingConstraints
queue string
resources schedulerobjects.QuantityByTAndResourceType[string]
expectedResources schedulerobjects.QuantityByTAndResourceType[string]
}{
"no contraints": {
constraints: NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
},
"unconstrained": {
constraints: NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
configuration.SchedulingConfig{
PriorityClasses: map[string]types.PriorityClass{
"priority-class-1": {
MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{
"pool-1": {"cpu": 0.1, "memory": 0.9},
},
},
},
},
[]*api.Queue{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")},
},
"per pool cap": {
constraints: NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
configuration.SchedulingConfig{
PriorityClasses: map[string]types.PriorityClass{
"priority-class-1": {
MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{
"pool-1": {"cpu": 0.1, "memory": 0.9},
},
},
},
},
[]*api.Queue{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("100", "900Gi")},
},
"per queue cap": {
constraints: NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
configuration.SchedulingConfig{
PriorityClasses: map[string]types.PriorityClass{
"priority-class-1": {
MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{
"pool-1": {"cpu": 0.1, "memory": 0.9},
},
},
},
},
[]*api.Queue{
{
Name: "queue-1",
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9},
},
},
},
},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("900", "900Gi")},
},
"per queue cap with multi pc": {
constraints: NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
configuration.SchedulingConfig{
PriorityClasses: map[string]types.PriorityClass{
"priority-class-1": {
MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{
"pool-1": {"cpu": 0.1, "memory": 0.9},
},
},
},
},
[]*api.Queue{
{
Name: "queue-1",
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFraction: map[string]float64{"cpu": 0.1, "memory": 0.1},
},
"priority-class-2": {
MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9},
},
},
},
},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{
"priority-class-1": makeResourceList("1000", "1000Gi"),
"priority-class-2": makeResourceList("2000", "2000Gi"),
},
expectedResources: map[string]schedulerobjects.ResourceList{
"priority-class-1": makeResourceList("100", "100Gi"),
"priority-class-2": makeResourceList("900", "900Gi"),
},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
capped := tc.constraints.CapResources(tc.queue, tc.resources)

// Compare resources for equality. Note that we can't just do assert.Equal(tc.expectedResources, capped)
// because the scale may have changed
require.Equal(t, len(tc.expectedResources), len(capped), "number of priority classes differs")
for pc, rl := range tc.expectedResources {
cappedRl, ok := capped[pc]
require.True(t, ok, "no resource list found for priority class %s", pc)
require.Equal(t, len(rl.Resources), len(cappedRl.Resources), "number of resources differs for priority class %s", pc)
for res, qty := range rl.Resources {
cappedRes, ok := cappedRl.Resources[res]
require.True(t, ok, "resource %s doesn't exist at priority class %s", res, pc)
assert.Equal(t, 0, qty.Cmp(cappedRes), "resource %s differs at priority class %s", res, pc)
}
}
})
}
}

func makeMultiLevelConstraintsTest(requirements map[string]resource.Quantity, expectedCheckConstraintsReason string, expectedCheckRoundConstraintsReason string) *constraintTest {
zeroResources := schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"a": resource.MustParse("0"), "b": resource.MustParse("0"), "c": resource.MustParse("0"), "d": resource.MustParse("0")},
Expand Down
7 changes: 6 additions & 1 deletion internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(
queue string, weight float64,
initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string],
demand schedulerobjects.ResourceList,
cappedDemand schedulerobjects.ResourceList,
limiter *rate.Limiter,
) error {
if _, ok := sctx.QueueSchedulingContexts[queue]; ok {
Expand Down Expand Up @@ -138,6 +139,7 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(
Limiter: limiter,
Allocated: allocated,
Demand: demand,
CappedDemand: cappedDemand,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand Down Expand Up @@ -186,7 +188,7 @@ func (sctx *SchedulingContext) UpdateFairShares() {
for queueName, qctx := range sctx.QueueSchedulingContexts {
cappedShare := 1.0
if !sctx.TotalResources.IsZero() {
cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.Demand)
cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.CappedDemand)
}
queueInfos = append(queueInfos, &queueInfo{
queueName: queueName,
Expand Down Expand Up @@ -414,6 +416,9 @@ type QueueSchedulingContext struct {
// Total demand from this queue. This is essentially the cumulative resources of all non-terminal jobs at the
// start of the scheduling cycle
Demand schedulerobjects.ResourceList
// Capped Demand for this queue. This differs from Demand in that it takes into account any limits that we have
// placed on the queue
CappedDemand schedulerobjects.ResourceList
// Fair share is the weight of this queue over the sum of the weights of all queues
FairShare float64
// AdjustedFairShare modifies fair share such that queues that have a demand cost less than their fair share, have their fair share reallocated.
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSchedulingContextAccounting(t *testing.T) {
},
}
for _, queue := range []string{"A", "B"} {
err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], schedulerobjects.ResourceList{}, nil)
err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], schedulerobjects.ResourceList{}, schedulerobjects.ResourceList{}, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -251,7 +251,7 @@ func TestCalculateFairShares(t *testing.T) {
)
for qName, q := range tc.queueCtxs {
err = sctx.AddQueueSchedulingContext(
qName, q.Weight, schedulerobjects.QuantityByTAndResourceType[string]{}, q.Demand, nil)
qName, q.Weight, schedulerobjects.QuantityByTAndResourceType[string]{}, q.Demand, q.Demand, nil)
require.NoError(t, err)
}
sctx.UpdateFairShares()
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ func TestGangScheduler(t *testing.T) {
priorityFactor,
nil,
schedulerobjects.NewResourceList(0),
schedulerobjects.NewResourceList(0),
rate.NewLimiter(
rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate),
tc.SchedulingConfig.MaximumPerQueueSchedulingBurst,
Expand Down
7 changes: 5 additions & 2 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
weight,
allocatedByQueueAndPriorityClass[queue],
demandByQueue[queue],
demandByQueue[queue],
limiterByQueue[queue],
)
require.NoError(t, err)
Expand Down Expand Up @@ -2208,7 +2209,8 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
)
for queue, priorityFactor := range priorityFactorByQueue {
weight := 1 / priorityFactor
err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string]), schedulerobjects.NewResourceList(0), limiterByQueue[queue])
err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string]),
schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), limiterByQueue[queue])
require.NoError(b, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints(
Expand Down Expand Up @@ -2275,7 +2277,8 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
)
for queue, priorityFactor := range priorityFactorByQueue {
weight := 1 / priorityFactor
err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], schedulerobjects.NewResourceList(0), limiterByQueue[queue])
err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue],
schedulerobjects.NewResourceList(0), schedulerobjects.NewResourceList(0), limiterByQueue[queue])
require.NoError(b, err)
}
sch := NewPreemptingQueueScheduler(
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func TestQueueScheduler(t *testing.T) {
q.Name, weight,
tc.InitialAllocatedByQueueAndPriorityClass[q.Name],
schedulerobjects.NewResourceList(0),
schedulerobjects.NewResourceList(0),
rate.NewLimiter(
rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate),
tc.SchedulingConfig.MaximumPerQueueSchedulingBurst,
Expand Down
23 changes: 18 additions & 5 deletions internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ var demandPerQueueDesc = prometheus.NewDesc(
}, nil,
)

var cappedDemandPerQueueDesc = prometheus.NewDesc(
fmt.Sprintf("%s_%s_%s", NAMESPACE, SUBSYSTEM, "capped_demand"),
"Capped Demand of each queue and pool. This differs from demand in that it limits demand by scheduling constraints",
[]string{
"queue",
"pool",
}, nil,
)

var fairnessErrorDesc = prometheus.NewDesc(
fmt.Sprintf("%s_%s_%s", NAMESPACE, SUBSYSTEM, "fairness_error"),
"Cumulative delta between adjusted fair share and actual share for all users who are below their fair share",
Expand Down Expand Up @@ -174,10 +183,11 @@ func generateSchedulerMetrics(schedulingRoundData schedulingRoundData) []prometh

for key, value := range schedulingRoundData.queuePoolData {
result = append(result, prometheus.MustNewConstMetric(consideredJobsDesc, prometheus.GaugeValue, float64(value.numberOfJobsConsidered), key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(fairSharePerQueueDesc, prometheus.GaugeValue, float64(value.fairShare), key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(adjustedFairSharePerQueueDesc, prometheus.GaugeValue, float64(value.adjustedFairShare), key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(actualSharePerQueueDesc, prometheus.GaugeValue, float64(value.actualShare), key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(demandPerQueueDesc, prometheus.GaugeValue, float64(value.demand), key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(fairSharePerQueueDesc, prometheus.GaugeValue, value.fairShare, key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(adjustedFairSharePerQueueDesc, prometheus.GaugeValue, value.adjustedFairShare, key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(actualSharePerQueueDesc, prometheus.GaugeValue, value.actualShare, key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(demandPerQueueDesc, prometheus.GaugeValue, value.demand, key.queue, key.pool))
result = append(result, prometheus.MustNewConstMetric(cappedDemandPerQueueDesc, prometheus.GaugeValue, value.cappedDemand, key.queue, key.pool))
}
for key, value := range schedulingRoundData.scheduledJobData {
result = append(result, prometheus.MustNewConstMetric(scheduledJobsDesc, prometheus.CounterValue, float64(value), key.queue, key.priorityClass))
Expand All @@ -187,7 +197,7 @@ func generateSchedulerMetrics(schedulingRoundData schedulingRoundData) []prometh
}

for pool, fairnessError := range schedulingRoundData.fairnessError {
result = append(result, prometheus.MustNewConstMetric(fairnessErrorDesc, prometheus.CounterValue, fairnessError, pool))
result = append(result, prometheus.MustNewConstMetric(fairnessErrorDesc, prometheus.GaugeValue, fairnessError, pool))
}

return result
Expand Down Expand Up @@ -224,12 +234,14 @@ func (metrics *SchedulerMetrics) calculateQueuePoolMetrics(schedulingContexts []
key := queuePoolKey{queue: queue, pool: pool}
actualShare := schedContext.FairnessCostProvider.UnweightedCostFromQueue(queueContext)
demand := schedContext.FairnessCostProvider.UnweightedCostFromAllocation(queueContext.Demand)
cappedDemand := schedContext.FairnessCostProvider.UnweightedCostFromAllocation(queueContext.CappedDemand)
result[key] = queuePoolData{
numberOfJobsConsidered: len(queueContext.UnsuccessfulJobSchedulingContexts) + len(queueContext.SuccessfulJobSchedulingContexts),
fairShare: queueContext.FairShare,
adjustedFairShare: queueContext.AdjustedFairShare,
actualShare: actualShare,
demand: demand,
cappedDemand: cappedDemand,
}
}
}
Expand Down Expand Up @@ -274,4 +286,5 @@ type queuePoolData struct {
fairShare float64
adjustedFairShare float64
demand float64
cappedDemand float64
}
Loading

0 comments on commit 70039be

Please sign in to comment.