From 03abe98f8882acbe6ac44f27c81bdd315f63eb81 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 27 Jun 2024 22:27:45 +0100 Subject: [PATCH 1/4] wip Signed-off-by: Chris Martin --- internal/scheduler/context/context.go | 4 ++-- internal/scheduler/fairness/fairness.go | 22 ++++++++++++----- internal/scheduler/fairness/fairness_test.go | 6 ++--- .../scheduler/preempting_queue_scheduler.go | 3 ++- .../preempting_queue_scheduler_test.go | 24 +++++++++++++++++++ internal/scheduler/queue_scheduler.go | 2 +- internal/scheduler/scheduler_metrics.go | 2 +- 7 files changed, 49 insertions(+), 14 deletions(-) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index b4bc129a8ce..4849fe69fb1 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -164,7 +164,7 @@ func (sctx *SchedulingContext) GetQueue(queue string) (fairness.Queue, bool) { func (sctx *SchedulingContext) TotalCost() float64 { var rv float64 for _, qctx := range sctx.QueueSchedulingContexts { - rv += sctx.FairnessCostProvider.CostFromQueue(qctx) + rv += sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx) } return rv } @@ -187,7 +187,7 @@ func (sctx *SchedulingContext) UpdateFairShares() { for queueName, qctx := range sctx.QueueSchedulingContexts { cappedShare := 1.0 if !sctx.TotalResources.IsZero() { - cappedShare = sctx.FairnessCostProvider.CostFromAllocationAndWeight(qctx.Demand, qctx.Weight) * qctx.Weight + cappedShare = sctx.FairnessCostProvider.WeightedCostFromAllocation(qctx.Demand, qctx.Weight) * qctx.Weight } queueInfos = append(queueInfos, &queueInfo{ queueName: queueName, diff --git a/internal/scheduler/fairness/fairness.go b/internal/scheduler/fairness/fairness.go index 991a14fbe09..07b861bdb94 100644 --- a/internal/scheduler/fairness/fairness.go +++ b/internal/scheduler/fairness/fairness.go @@ -21,8 +21,10 @@ type Queue interface { // FairnessCostProvider captures algorithms to compute the cost of an allocation. type FairnessCostProvider interface { - CostFromQueue(queue Queue) float64 - CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 + UnweightedCostFromQueue(queue Queue) float64 + UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 + WeightedCostFromQueue(queue Queue) float64 + WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 } type DominantResourceFairness struct { @@ -42,11 +44,19 @@ func NewDominantResourceFairness(totalResources schedulerobjects.ResourceList, r }, nil } -func (f *DominantResourceFairness) CostFromQueue(queue Queue) float64 { - return f.CostFromAllocationAndWeight(queue.GetAllocation(), queue.GetWeight()) +func (f *DominantResourceFairness) WeightedCostFromQueue(queue Queue) float64 { + return f.UnweightedCostFromQueue(queue) / queue.GetWeight() } -func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 { +func (f *DominantResourceFairness) UnweightedCostFromQueue(queue Queue) float64 { + return f.UnweightedCostFromAllocation(queue.GetAllocation()) +} + +func (f *DominantResourceFairness) WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 { + return f.UnweightedCostFromAllocation(allocation) / weight +} + +func (f *DominantResourceFairness) UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 { var cost float64 for _, t := range f.resourcesToConsider { capacity := f.totalResources.Get(t) @@ -60,5 +70,5 @@ func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedu cost = tcost } } - return cost / weight + return cost } diff --git a/internal/scheduler/fairness/fairness_test.go b/internal/scheduler/fairness/fairness_test.go index 1f16b8db42c..00a96788f06 100644 --- a/internal/scheduler/fairness/fairness_test.go +++ b/internal/scheduler/fairness/fairness_test.go @@ -156,12 +156,12 @@ func TestDominantResourceFairness(t *testing.T) { assert.Equal( t, tc.expectedCost, - f.CostFromAllocationAndWeight(tc.allocation, tc.weight), + f.WeightedCostFromAllocation(tc.allocation, tc.weight), ) assert.Equal( t, - f.CostFromAllocationAndWeight(tc.allocation, tc.weight), - f.CostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}), + f.WeightedCostFromAllocation(tc.allocation, tc.weight), + f.WeightedCostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}), ) }) } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index dc6c8c069df..f45b645d9ea 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -131,12 +131,13 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche return false } if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok { - actualShare := sch.schedulingContext.FairnessCostProvider.CostFromQueue(qctx) / totalCost + actualShare := sch.schedulingContext.FairnessCostProvider.UnweightedCostFromQueue(qctx) / totalCost fairShare := qctx.FairShare if sch.useAdjustedFairShareProtection { fairShare = math.Max(qctx.AdjustedFairShare, fairShare) } fractionOfFairShare := actualShare / fairShare + ctx.Infof("queue=%s, fairShare=%.2f, actualShare=%.2f", qctx.Queue, fairShare, actualShare) if fractionOfFairShare <= sch.protectedFractionOfFairShare { return false } diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index e726c4b4a88..a73e5162209 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1312,6 +1312,30 @@ func TestPreemptingQueueScheduler(t *testing.T) { "D": 1, }, }, + "ProtectedFractionOfFairShare non equal weights": { + SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig( + 1.0, + testfixtures.TestSchedulingConfig(), + ), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 16), + "B": testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 16), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 15), + "B": testfixtures.IntRange(0, 15), + }, + }, + {}, // Empty round to make sure nothing changes. + }, + PriorityFactorByQueue: map[string]float64{ + "A": 1, + "B": 10, + }, + }, "DominantResourceFairness": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index a3d5b33d8a9..de7cb68dd5c 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -465,7 +465,7 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc it.buffer.Zero() it.buffer.Add(queue.GetAllocation()) it.buffer.Add(gctx.TotalResourceRequests) - return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil + return it.fairnessCostProvider.WeightedCostFromAllocation(it.buffer, queue.GetWeight()), nil } // Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index 04464bb7ac4..6a528f77c09 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -199,7 +199,7 @@ func (metrics *SchedulerMetrics) calculateQueuePoolMetrics(schedulingContexts [] for queue, queueContext := range schedContext.QueueSchedulingContexts { key := queuePoolKey{queue: queue, pool: pool} - actualShare := schedContext.FairnessCostProvider.CostFromQueue(queueContext) / totalCost + actualShare := schedContext.FairnessCostProvider.UnweightedCostFromQueue(queueContext) / totalCost result[key] = queuePoolData{ numberOfJobsConsidered: len(queueContext.UnsuccessfulJobSchedulingContexts) + len(queueContext.SuccessfulJobSchedulingContexts), fairShare: queueContext.FairShare, From 85a25023a6a1edbff5ee1cb6d5e562e7acb3b990 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 27 Jun 2024 22:33:59 +0100 Subject: [PATCH 2/4] wip Signed-off-by: Chris Martin --- .../preempting_queue_scheduler_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index a73e5162209..0bd656e64f4 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1321,19 +1321,26 @@ func TestPreemptingQueueScheduler(t *testing.T) { Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ - "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 16), - "B": testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 16), + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass2NonPreemptible, 24), + "B": testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 8), }, ExpectedScheduledIndices: map[string][]int{ - "A": testfixtures.IntRange(0, 15), - "B": testfixtures.IntRange(0, 15), + "A": testfixtures.IntRange(0, 23), + "B": testfixtures.IntRange(0, 7), + }, + }, + { + // D submits one more job. No preemption occurs because B is below adjusted fair share + JobsByQueue: map[string][]*jobdb.Job{ + "C": testfixtures.N1Cpu4GiJobs("C", testfixtures.PriorityClass0, 1), }, }, {}, // Empty round to make sure nothing changes. }, PriorityFactorByQueue: map[string]float64{ "A": 1, - "B": 10, + "B": 2, + "c": 1, }, }, "DominantResourceFairness": { From 6d470ce5031fa5aa101ea25460b3d83d3db01f06 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 27 Jun 2024 23:05:29 +0100 Subject: [PATCH 3/4] wip Signed-off-by: Chris Martin --- internal/scheduler/preempting_queue_scheduler.go | 1 - internal/scheduler/preempting_queue_scheduler_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index f45b645d9ea..8732836cbc0 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -137,7 +137,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche fairShare = math.Max(qctx.AdjustedFairShare, fairShare) } fractionOfFairShare := actualShare / fairShare - ctx.Infof("queue=%s, fairShare=%.2f, actualShare=%.2f", qctx.Queue, fairShare, actualShare) if fractionOfFairShare <= sch.protectedFractionOfFairShare { return false } diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 0bd656e64f4..5e31e10553a 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1340,7 +1340,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { PriorityFactorByQueue: map[string]float64{ "A": 1, "B": 2, - "c": 1, + "C": 1, }, }, "DominantResourceFairness": { From 4645b539eb5986cb0225269dc25a4ed7cf913ab7 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 28 Jun 2024 09:28:13 +0100 Subject: [PATCH 4/4] code review comment Signed-off-by: Chris Martin --- internal/scheduler/context/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 4849fe69fb1..9cf10b02fdf 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -187,7 +187,7 @@ func (sctx *SchedulingContext) UpdateFairShares() { for queueName, qctx := range sctx.QueueSchedulingContexts { cappedShare := 1.0 if !sctx.TotalResources.IsZero() { - cappedShare = sctx.FairnessCostProvider.WeightedCostFromAllocation(qctx.Demand, qctx.Weight) * qctx.Weight + cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.Demand) } queueInfos = append(queueInfos, &queueInfo{ queueName: queueName,