Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Issues Arising from Confusing Weighted Costs #3758

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (sctx *SchedulingContext) GetQueue(queue string) (fairness.Queue, bool) {
func (sctx *SchedulingContext) TotalCost() float64 {
var rv float64
for _, qctx := range sctx.QueueSchedulingContexts {
rv += sctx.FairnessCostProvider.CostFromQueue(qctx)
rv += sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx)
}
return rv
}
Expand All @@ -187,7 +187,7 @@ func (sctx *SchedulingContext) UpdateFairShares() {
for queueName, qctx := range sctx.QueueSchedulingContexts {
cappedShare := 1.0
if !sctx.TotalResources.IsZero() {
cappedShare = sctx.FairnessCostProvider.CostFromAllocationAndWeight(qctx.Demand, qctx.Weight) * qctx.Weight
cappedShare = sctx.FairnessCostProvider.WeightedCostFromAllocation(qctx.Demand, qctx.Weight) * qctx.Weight
}
queueInfos = append(queueInfos, &queueInfo{
queueName: queueName,
Expand Down
22 changes: 16 additions & 6 deletions internal/scheduler/fairness/fairness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ type Queue interface {

// FairnessCostProvider captures algorithms to compute the cost of an allocation.
type FairnessCostProvider interface {
CostFromQueue(queue Queue) float64
CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64
UnweightedCostFromQueue(queue Queue) float64
UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64
WeightedCostFromQueue(queue Queue) float64
WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64
}

type DominantResourceFairness struct {
Expand All @@ -42,11 +44,19 @@ func NewDominantResourceFairness(totalResources schedulerobjects.ResourceList, r
}, nil
}

func (f *DominantResourceFairness) CostFromQueue(queue Queue) float64 {
return f.CostFromAllocationAndWeight(queue.GetAllocation(), queue.GetWeight())
func (f *DominantResourceFairness) WeightedCostFromQueue(queue Queue) float64 {
return f.UnweightedCostFromQueue(queue) / queue.GetWeight()
}

func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 {
func (f *DominantResourceFairness) UnweightedCostFromQueue(queue Queue) float64 {
return f.UnweightedCostFromAllocation(queue.GetAllocation())
}

func (f *DominantResourceFairness) WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 {
return f.UnweightedCostFromAllocation(allocation) / weight
}

func (f *DominantResourceFairness) UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 {
var cost float64
for _, t := range f.resourcesToConsider {
capacity := f.totalResources.Get(t)
Expand All @@ -60,5 +70,5 @@ func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedu
cost = tcost
}
}
return cost / weight
return cost
}
6 changes: 3 additions & 3 deletions internal/scheduler/fairness/fairness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func TestDominantResourceFairness(t *testing.T) {
assert.Equal(
t,
tc.expectedCost,
f.CostFromAllocationAndWeight(tc.allocation, tc.weight),
f.WeightedCostFromAllocation(tc.allocation, tc.weight),
)
assert.Equal(
t,
f.CostFromAllocationAndWeight(tc.allocation, tc.weight),
f.CostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}),
f.WeightedCostFromAllocation(tc.allocation, tc.weight),
f.WeightedCostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}),
)
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok {
actualShare := sch.schedulingContext.FairnessCostProvider.CostFromQueue(qctx) / totalCost
actualShare := sch.schedulingContext.FairnessCostProvider.UnweightedCostFromQueue(qctx) / totalCost
fairShare := qctx.FairShare
if sch.useAdjustedFairShareProtection {
fairShare = math.Max(qctx.AdjustedFairShare, fairShare)
Expand Down
31 changes: 31 additions & 0 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,37 @@ func TestPreemptingQueueScheduler(t *testing.T) {
"D": 1,
},
},
"ProtectedFractionOfFairShare non equal weights": {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test demonstrates the issue

SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig(
1.0,
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass2NonPreemptible, 24),
"B": testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 8),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 23),
"B": testfixtures.IntRange(0, 7),
},
},
{
// D submits one more job. No preemption occurs because B is below adjusted fair share
JobsByQueue: map[string][]*jobdb.Job{
"C": testfixtures.N1Cpu4GiJobs("C", testfixtures.PriorityClass0, 1),
},
},
{}, // Empty round to make sure nothing changes.
},
PriorityFactorByQueue: map[string]float64{
"A": 1,
"B": 2,
"C": 1,
},
},
"DominantResourceFairness": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc
it.buffer.Zero()
it.buffer.Add(queue.GetAllocation())
it.buffer.Add(gctx.TotalResourceRequests)
return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil
return it.fairnessCostProvider.WeightedCostFromAllocation(it.buffer, queue.GetWeight()), nil
}

// Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (metrics *SchedulerMetrics) calculateQueuePoolMetrics(schedulingContexts []

for queue, queueContext := range schedContext.QueueSchedulingContexts {
key := queuePoolKey{queue: queue, pool: pool}
actualShare := schedContext.FairnessCostProvider.CostFromQueue(queueContext) / totalCost
actualShare := schedContext.FairnessCostProvider.UnweightedCostFromQueue(queueContext) / totalCost
result[key] = queuePoolData{
numberOfJobsConsidered: len(queueContext.UnsuccessfulJobSchedulingContexts) + len(queueContext.SuccessfulJobSchedulingContexts),
fairShare: queueContext.FairShare,
Expand Down