From 56c9d9f1032d9144f0b2d09eea2908f06cb0ca78 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Mon, 22 Jul 2024 14:00:42 +0100 Subject: [PATCH] Remove unnecessary passing of priorityClasses + remove unused fields (#3801) We pass priorityClasses around a lot, but largely these are unused at the bottom of the stack - I've removed this passing where the priorityClass is not used Removed a few other minor unused parameters Signed-off-by: JamesMurkin --- internal/scheduler/constraints/constraints.go | 2 +- .../scheduler/constraints/constraints_test.go | 2 +- internal/scheduler/context/context.go | 11 +----- internal/scheduler/context/context_test.go | 4 -- internal/scheduler/gang_scheduler.go | 2 +- internal/scheduler/gang_scheduler_test.go | 4 +- internal/scheduler/jobiteration.go | 19 ++++------ internal/scheduler/jobiteration_test.go | 16 ++++---- internal/scheduler/nodedb/nodedb.go | 21 +++++------ internal/scheduler/nodedb/nodedb_test.go | 26 ++++++------- .../scheduler/preempting_queue_scheduler.go | 37 +++++++------------ .../preempting_queue_scheduler_test.go | 11 +----- internal/scheduler/queue_scheduler_test.go | 7 +--- internal/scheduler/scheduler_metrics_test.go | 2 +- internal/scheduler/scheduler_test.go | 4 +- internal/scheduler/scheduling_algo.go | 2 - internal/scheduler/simulator/simulator.go | 4 +- internal/scheduler/submitcheck.go | 2 +- 18 files changed, 65 insertions(+), 111 deletions(-) diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 0a7ddbcf1bd..5a57f476a23 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -150,7 +150,7 @@ func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { return q } -func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) { +func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) { // maximumResourcesToSchedule check. if !isStrictlyLessOrEqual(sctx.ScheduledResources.Resources, constraints.maximumResourcesToSchedule) { return false, MaximumResourcesScheduledUnschedulableReason, nil diff --git a/internal/scheduler/constraints/constraints_test.go b/internal/scheduler/constraints/constraints_test.go index e422c23e7f6..4d749827f6c 100644 --- a/internal/scheduler/constraints/constraints_test.go +++ b/internal/scheduler/constraints/constraints_test.go @@ -144,7 +144,7 @@ func TestConstraints(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - ok, unscheduledReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue) + ok, unscheduledReason, err := tc.constraints.CheckRoundConstraints(tc.sctx) require.NoError(t, err) require.Equal(t, tc.expectedCheckRoundConstraintsReason == "", ok) require.Equal(t, tc.expectedCheckRoundConstraintsReason, unscheduledReason) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 81b63586445..086348e2fc1 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -19,7 +19,6 @@ import ( "github.com/armadaproject/armada/internal/common/armadaerrors" armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" - "github.com/armadaproject/armada/internal/common/types" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/fairness" "github.com/armadaproject/armada/internal/scheduler/interfaces" @@ -36,10 +35,6 @@ type SchedulingContext struct { Finished time.Time // Pool for which we're currently scheduling jobs. Pool string - // Allowed priority classes. - PriorityClasses map[string]types.PriorityClass - // Default priority class. - DefaultPriorityClass string // Determines how fairness is computed. FairnessCostProvider fairness.FairnessCostProvider // Limits job scheduling rate globally across all queues. @@ -78,8 +73,6 @@ type SchedulingContext struct { func NewSchedulingContext( pool string, - priorityClasses map[string]types.PriorityClass, - defaultPriorityClass string, fairnessCostProvider fairness.FairnessCostProvider, limiter *rate.Limiter, totalResources schedulerobjects.ResourceList, @@ -87,8 +80,6 @@ func NewSchedulingContext( return &SchedulingContext{ Started: time.Now(), Pool: pool, - PriorityClasses: priorityClasses, - DefaultPriorityClass: defaultPriorityClass, FairnessCostProvider: fairnessCostProvider, Limiter: limiter, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), @@ -822,7 +813,7 @@ func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error) return gangInfo, nil } -func JobSchedulingContextsFromJobs[J *jobdb.Job](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext { +func JobSchedulingContextsFromJobs[J *jobdb.Job](jobs []J) []*JobSchedulingContext { jctxs := make([]*JobSchedulingContext, len(jobs)) for i, job := range jobs { jctxs[i] = JobSchedulingContextFromJob(job) diff --git a/internal/scheduler/context/context_test.go b/internal/scheduler/context/context_test.go index 9ca097cd70c..dac16c728c4 100644 --- a/internal/scheduler/context/context_test.go +++ b/internal/scheduler/context/context_test.go @@ -40,8 +40,6 @@ func TestSchedulingContextAccounting(t *testing.T) { require.NoError(t, err) sctx := NewSchedulingContext( "pool", - testfixtures.TestPriorityClasses, - testfixtures.TestDefaultPriorityClass, fairnessCostProvider, nil, totalResources, @@ -243,8 +241,6 @@ func TestCalculateFairShares(t *testing.T) { require.NoError(t, err) sctx := NewSchedulingContext( "pool", - testfixtures.TestPriorityClasses, - testfixtures.TestDefaultPriorityClass, fairnessCostProvider, nil, tc.availableResources, diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 6b611704b24..05de27a94fe 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -103,7 +103,7 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulerco func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { // Exit immediately if this is a new gang and we've hit any round limits. if !gctx.AllJobsEvicted { - if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok { + if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok { return } } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index fd698b0fa05..1f105557c91 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -556,8 +556,6 @@ func TestGangScheduler(t *testing.T) { require.NoError(t, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), @@ -593,7 +591,7 @@ func TestGangScheduler(t *testing.T) { var actualScheduledIndices []int scheduledGangs := 0 for i, gang := range tc.Gangs { - jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.SchedulingConfig.PriorityClasses, gang) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(gang) gctx := schedulercontext.NewGangSchedulingContext(jctxs) ok, reason, err := sch.Schedule(armadacontext.Background(), gctx) require.NoError(t, err) diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index 8f356db66e7..b814e9144a9 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -7,7 +7,6 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" armadaslices "github.com/armadaproject/armada/internal/common/slices" - "github.com/armadaproject/armada/internal/common/types" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/jobdb" ) @@ -106,19 +105,17 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - repo JobRepository - jobIds []string - priorityClasses map[string]types.PriorityClass - idx int - ctx *armadacontext.Context + repo JobRepository + jobIds []string + idx int + ctx *armadacontext.Context } -func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository, priorityClasses map[string]types.PriorityClass) *QueuedJobsIterator { +func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator { return &QueuedJobsIterator{ - jobIds: repo.GetQueueJobIds(queue), - repo: repo, - priorityClasses: priorityClasses, - ctx: ctx, + jobIds: repo.GetQueueJobIds(queue), + repo: repo, + ctx: ctx, } } diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go index d714a01531c..c4f10a00bfa 100644 --- a/internal/scheduler/jobiteration_test.go +++ b/internal/scheduler/jobiteration_test.go @@ -64,7 +64,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { ctx := armadacontext.Background() its := make([]JobIterator, 3) for i, queue := range []string{"A", "B", "C"} { - it := NewQueuedJobsIterator(ctx, queue, repo, nil) + it := NewQueuedJobsIterator(ctx, queue, repo) its[i] = it } it := NewMultiJobsIterator(its...) @@ -93,7 +93,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -115,7 +115,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -137,7 +137,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -164,7 +164,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -187,7 +187,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) time.Sleep(20 * time.Millisecond) defer cancel() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) job, err := it.Next() assert.Nil(t, job) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -205,7 +205,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) for job, err := it.Next(); job != nil; job, err = it.Next() { require.NoError(t, err) } @@ -243,7 +243,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { } func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator { - return NewQueuedJobsIterator(ctx, queue, repo, nil) + return NewQueuedJobsIterator(ctx, queue, repo) } func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index efe0ae3d394..aaf24244154 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -861,7 +861,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s nodeCopy := node.node.DeepCopyNilKeys() for _, job := range node.evictedJobs { // Remove preempted job from node - err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, job.JobSchedulingContext.Job, nodeCopy) + err = nodeDb.unbindJobFromNodeInPlace(job.JobSchedulingContext.Job, nodeCopy) if err != nil { return nil, err } @@ -942,7 +942,6 @@ func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job *jobdb. // the jobs' priorities to evictedPriority; they are not subtracted from AllocatedByJobId and // AllocatedByQueue. func (nodeDb *NodeDb) EvictJobsFromNode( - priorityClasses map[string]types.PriorityClass, jobFilter func(*jobdb.Job) bool, jobs []*jobdb.Job, node *internaltypes.Node, @@ -954,7 +953,7 @@ func (nodeDb *NodeDb) EvictJobsFromNode( continue } evicted = append(evicted, job) - if err := nodeDb.evictJobFromNodeInPlace(priorityClasses, job, node); err != nil { + if err := nodeDb.evictJobFromNodeInPlace(job, node); err != nil { return nil, nil, err } } @@ -962,7 +961,7 @@ func (nodeDb *NodeDb) EvictJobsFromNode( } // evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode. -func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error { +func (nodeDb *NodeDb) evictJobFromNodeInPlace(job *jobdb.Job, node *internaltypes.Node) error { jobId := job.Id() if _, ok := node.AllocatedByJobId[jobId]; !ok { return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.GetId()) @@ -1010,10 +1009,10 @@ func markAllocatable(allocatableByPriority map[int32]internaltypes.ResourceList, } // UnbindJobsFromNode returns a node with all elements of jobs unbound from it. -func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.PriorityClass, jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) UnbindJobsFromNode(jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { node = node.DeepCopyNilKeys() for _, job := range jobs { - if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { + if err := nodeDb.unbindJobFromNodeInPlace(job, node); err != nil { return nil, err } } @@ -1021,16 +1020,16 @@ func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.Priori } // UnbindJobFromNode returns a copy of node with job unbound from it. -func (nodeDb *NodeDb) UnbindJobFromNode(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) UnbindJobFromNode(job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { node = node.DeepCopyNilKeys() - if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { + if err := nodeDb.unbindJobFromNodeInPlace(job, node); err != nil { return nil, err } return node, nil } // unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of node. -func (nodeDb *NodeDb) unbindJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error { +func (nodeDb *NodeDb) unbindJobFromNodeInPlace(job *jobdb.Job, node *internaltypes.Node) error { jobId := job.Id() requests := job.EfficientResourceRequirements() @@ -1178,7 +1177,7 @@ func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, inde } func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[int32]string, map[int32]int) { - nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities, resources) + nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities) evictionsTable := evictionsTableSchema() return &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ @@ -1188,7 +1187,7 @@ func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[ }, indexNameByPriority, keyIndexByPriority } -func nodesTableSchema(priorities []int32, resources []string) (*memdb.TableSchema, map[int32]string, map[int32]int) { +func nodesTableSchema(priorities []int32) (*memdb.TableSchema, map[int32]string, map[int32]int) { indexes := make(map[string]*memdb.IndexSchema, len(priorities)+1) indexes["id"] = &memdb.IndexSchema{ Name: "id", diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index dd07e25d07a..a42608d372a 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -72,7 +72,7 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) for _, jctx := range jctxs { txn := db.Txn(false) jctx.SetAssignedNodeId(nodeId) @@ -97,7 +97,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) for _, jctx := range jctxs { txn := db.Txn(false) jctx.SetAssignedNodeId("non-existent node") @@ -133,32 +133,32 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { boundNode, err := nodeDb.bindJobToNode(entry, job, job.PodRequirements().Priority) require.NoError(t, err) - unboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, boundNode) + unboundNode, err := nodeDb.UnbindJobFromNode(job, boundNode) require.NoError(t, err) - unboundMultipleNode, err := nodeDb.UnbindJobsFromNode(testfixtures.TestPriorityClasses, []*jobdb.Job{job}, boundNode) + unboundMultipleNode, err := nodeDb.UnbindJobsFromNode([]*jobdb.Job{job}, boundNode) require.NoError(t, err) - evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, boundNode) + evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, boundNode) require.NoError(t, err) assert.Equal(t, []*jobdb.Job{job}, evictedJobs) - evictedUnboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode) + evictedUnboundNode, err := nodeDb.UnbindJobFromNode(job, evictedNode) require.NoError(t, err) evictedBoundNode, err := nodeDb.bindJobToNode(evictedNode, job, job.PodRequirements().Priority) require.NoError(t, err) - _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, entry) + _, _, err = nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, entry) require.Error(t, err) - _, err = nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, entry) + _, err = nodeDb.UnbindJobFromNode(job, entry) require.NoError(t, err) _, err = nodeDb.bindJobToNode(boundNode, job, job.PodRequirements().Priority) require.Error(t, err) - _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, evictedNode) + _, _, err = nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, evictedNode) require.Error(t, err) assertNodeAccountingEqual(t, entry, unboundNode) @@ -293,7 +293,7 @@ func TestEviction(t *testing.T) { for i, job := range jobs { existingJobs[i] = job } - actualEvictions, _, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, existingJobs, entry) + actualEvictions, _, err := nodeDb.EvictJobsFromNode(tc.jobFilter, existingJobs, entry) require.NoError(t, err) expectedEvictions := make([]*jobdb.Job, 0, len(tc.expectedEvictions)) for _, i := range tc.expectedEvictions { @@ -434,7 +434,7 @@ func TestScheduleIndividually(t *testing.T) { nodeDb, err := newNodeDbWithNodes(tc.Nodes) require.NoError(t, err) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, tc.Jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.Jobs) for i, jctx := range jctxs { nodeDbTxn := nodeDb.Txn(true) @@ -524,7 +524,7 @@ func TestScheduleMany(t *testing.T) { require.NoError(t, err) for i, jobs := range tc.Jobs { nodeDbTxn := nodeDb.Txn(true) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) gctx := schedulercontext.NewGangSchedulingContext(jctxs) ok, err := nodeDb.ScheduleManyWithTxn(nodeDbTxn, gctx) require.NoError(t, err) @@ -776,7 +776,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] b.ResetTimer() for n := 0; n < b.N; n++ { - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) gctx := schedulercontext.NewGangSchedulingContext(jctxs) txn := nodeDb.Txn(true) _, err := nodeDb.ScheduleManyWithTxn(txn, gctx) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 70981ac214d..8c386c5076b 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -14,7 +14,6 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" - "github.com/armadaproject/armada/internal/common/types" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" @@ -117,7 +116,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche NewNodeEvictor( sch.jobRepo, sch.nodeDb, - sch.schedulingContext.PriorityClasses, func(ctx *armadacontext.Context, job *jobdb.Job) bool { priorityClass := job.PriorityClass() if !priorityClass.Preemptible { @@ -180,7 +178,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche NewOversubscribedEvictor( sch.jobRepo, sch.nodeDb, - sch.schedulingContext.PriorityClasses, ), ) if err != nil { @@ -347,7 +344,6 @@ func (sch *PreemptingQueueScheduler) evictGangs(ctx *armadacontext.Context, txn evictor := NewFilteredEvictor( sch.jobRepo, sch.nodeDb, - sch.schedulingContext.PriorityClasses, gangNodeIds, gangJobIds, ) @@ -539,7 +535,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { jobIteratorByQueue[qctx.Queue] = evictedIt } else { - queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, jobRepo, sch.schedulingContext.PriorityClasses) + queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, jobRepo) jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) } } @@ -588,7 +584,7 @@ func (sch *PreemptingQueueScheduler) unbindJobs(jctxs []*schedulercontext.JobSch if err != nil { return err } - node, err = sch.nodeDb.UnbindJobsFromNode(sch.schedulingContext.PriorityClasses, jobsOnNode, node) + node, err = sch.nodeDb.UnbindJobsFromNode(jobsOnNode, node) if err != nil { return err } @@ -693,11 +689,10 @@ func (sch *PreemptingQueueScheduler) assertions( } type Evictor struct { - jobRepo JobRepository - nodeDb *nodedb.NodeDb - priorityClasses map[string]types.PriorityClass - nodeFilter func(*armadacontext.Context, *internaltypes.Node) bool - jobFilter func(*armadacontext.Context, *jobdb.Job) bool + jobRepo JobRepository + nodeDb *nodedb.NodeDb + nodeFilter func(*armadacontext.Context, *internaltypes.Node) bool + jobFilter func(*armadacontext.Context, *jobdb.Job) bool } type EvictorResult struct { @@ -730,13 +725,11 @@ func (er *EvictorResult) SummaryString() string { func NewNodeEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, - priorityClasses map[string]types.PriorityClass, jobFilter func(*armadacontext.Context, *jobdb.Job) bool, ) *Evictor { return &Evictor{ - jobRepo: jobRepo, - nodeDb: nodeDb, - priorityClasses: priorityClasses, + jobRepo: jobRepo, + nodeDb: nodeDb, nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool { return len(node.AllocatedByJobId) > 0 }, @@ -749,7 +742,6 @@ func NewNodeEvictor( func NewFilteredEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, - priorityClasses map[string]types.PriorityClass, nodeIdsToEvict map[string]bool, jobIdsToEvict map[string]bool, ) *Evictor { @@ -757,9 +749,8 @@ func NewFilteredEvictor( return nil } return &Evictor{ - jobRepo: jobRepo, - nodeDb: nodeDb, - priorityClasses: priorityClasses, + jobRepo: jobRepo, + nodeDb: nodeDb, nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool { shouldEvict := nodeIdsToEvict[node.GetId()] return shouldEvict @@ -776,16 +767,14 @@ func NewFilteredEvictor( func NewOversubscribedEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, - priorityClasses map[string]types.PriorityClass, ) *Evictor { // Populating overSubscribedPriorities relies on // - nodeFilter being called once before all calls to jobFilter and // - jobFilter being called for all jobs on that node before moving on to another node. var overSubscribedPriorities map[int32]bool return &Evictor{ - jobRepo: jobRepo, - nodeDb: nodeDb, - priorityClasses: priorityClasses, + jobRepo: jobRepo, + nodeDb: nodeDb, nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool { overSubscribedPriorities = make(map[int32]bool) for p, rl := range node.AllocatableByPriority { @@ -843,7 +832,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev } } jobs := evi.jobRepo.GetExistingJobsByIds(jobIds) - evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(evi.priorityClasses, jobFilter, jobs, node) + evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node) if err != nil { return nil, err } diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 57c4553a5a5..aff13ae00fb 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -57,8 +57,7 @@ func TestEvictOversubscribed(t *testing.T) { evictor := NewOversubscribedEvictor( NewSchedulerJobRepositoryAdapter(jobDbTxn), - nodeDb, - config.PriorityClasses) + nodeDb) result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn) require.NoError(t, err) @@ -1797,7 +1796,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { nodeId := nodeIdByJobId[job.Id()] node, err := nodeDb.GetNode(nodeId) require.NoError(t, err) - node, err = nodeDb.UnbindJobFromNode(tc.SchedulingConfig.PriorityClasses, job, node) + node, err = nodeDb.UnbindJobFromNode(job, node) require.NoError(t, err) err = nodeDb.Upsert(node) require.NoError(t, err) @@ -1837,8 +1836,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { require.NoError(t, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, limiter, tc.TotalResources, @@ -2200,8 +2197,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { require.NoError(b, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, limiter, nodeDb.TotalResources(), @@ -2267,8 +2262,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { for n := 0; n < b.N; n++ { sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, limiter, nodeDb.TotalResources(), diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 5fa64fbbb4d..b77e7f8d19f 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -501,10 +501,7 @@ func TestQueueScheduler(t *testing.T) { } jobRepo := NewInMemoryJobRepository() jobRepo.EnqueueMany( - schedulercontext.JobSchedulingContextsFromJobs( - tc.SchedulingConfig.PriorityClasses, - legacySchedulerJobs, - ), + schedulercontext.JobSchedulingContextsFromJobs(legacySchedulerJobs), ) fairnessCostProvider, err := fairness.NewDominantResourceFairness( @@ -514,8 +511,6 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), diff --git a/internal/scheduler/scheduler_metrics_test.go b/internal/scheduler/scheduler_metrics_test.go index ae171ce52b6..f1ae54306e4 100644 --- a/internal/scheduler/scheduler_metrics_test.go +++ b/internal/scheduler/scheduler_metrics_test.go @@ -22,7 +22,7 @@ func TestAggregateJobs(t *testing.T) { testfixtures.Test1Cpu4GiJob("queue_a", testfixtures.PriorityClass0), } - actual := aggregateJobContexts(map[queuePriorityClassKey]int{}, schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, testJobs)) + actual := aggregateJobContexts(map[queuePriorityClassKey]int{}, schedulercontext.JobSchedulingContextsFromJobs(testJobs)) expected := map[queuePriorityClassKey]int{ {queue: "queue_a", priorityClass: testfixtures.PriorityClass0}: 4, diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 5578d5bcfca..f0be5629d0c 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1436,8 +1436,8 @@ func NewSchedulerResultForTest[S ~[]T, T *jobdb.Job]( nodeIdByJobId map[string]string, ) *SchedulerResult { return &SchedulerResult{ - PreemptedJobs: schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, preemptedJobs), - ScheduledJobs: schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, scheduledJobs), + PreemptedJobs: schedulercontext.JobSchedulingContextsFromJobs(preemptedJobs), + ScheduledJobs: schedulercontext.JobSchedulingContextsFromJobs(scheduledJobs), NodeIdByJobId: nodeIdByJobId, } } diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 9e96dce55df..d71e8fd3561 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -438,8 +438,6 @@ func (l *FairSchedulingAlgo) schedulePool( } sctx := schedulercontext.NewSchedulingContext( pool, - l.schedulingConfig.PriorityClasses, - l.schedulingConfig.DefaultPriorityClassName, fairnessCostProvider, l.limiter, totalResources, diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index ab74665fbd1..d2caab2248f 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -468,8 +468,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { } sctx := schedulercontext.NewSchedulingContext( pool.Name, - s.schedulingConfig.PriorityClasses, - s.schedulingConfig.DefaultPriorityClassName, fairnessCostProvider, s.limiter, totalResources, @@ -850,7 +848,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error { } else if node == nil { return errors.Errorf("node %s not found", run.NodeId()) } - node, err = nodeDb.UnbindJobFromNode(s.schedulingConfig.PriorityClasses, job, node) + node, err = nodeDb.UnbindJobFromNode(job, node) if err != nil { return err } diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index f4100e53b2f..80f347d9adc 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -141,7 +141,7 @@ func (srv *SubmitChecker) Check(ctx *armadacontext.Context, jobs []*jobdb.Job) ( return nil, fmt.Errorf("executor state not loaded") } - jobContexts := schedulercontext.JobSchedulingContextsFromJobs(srv.schedulingConfig.PriorityClasses, jobs) + jobContexts := schedulercontext.JobSchedulingContextsFromJobs(jobs) results := make(map[string]schedulingResult, len(jobs)) // First, check if all jobs can be scheduled individually.